[REEF-2015] Adds azure batch job submission. Submit job to azure batch.
JIRA: [REEF-2015](https://issues.apache.org/jira/browse/REEF-2015) Pull Request: Closes # 1457 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/de1724c8 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/de1724c8 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/de1724c8 Branch: refs/heads/REEF-335 Commit: de1724c811097dcfb9f8b073a6e90eac8704046a Parents: d243aa2 Author: Tyson Condie <[email protected]> Authored: Fri May 4 15:28:32 2018 -0700 Committer: Doug Service <[email protected]> Committed: Wed May 9 00:27:21 2018 +0000 ---------------------------------------------------------------------- lang/common/proto/bridge/ClientProtocol.proto | 42 +++- lang/java/reef-bridge-proto-java/pom.xml | 25 +++ .../bridge/client/DriverServiceLauncher.java | 49 +++-- .../LocalDriverServiceRuntimeLauncher.java | 5 +- .../YarnDriverServiceRuntimeLauncher.java | 14 +- .../YarnDriverRuntimeConfigurationProvider.java | 27 +-- .../driver/client/grpc/DriverClientService.java | 1 + .../bridge/driver/launch/IDriverLauncher.java | 33 +++ .../AzureBatchDriverConfigurationProvider.java | 67 +++++++ .../launch/azbatch/AzureBatchLauncher.java | 110 ++++++++++ .../driver/launch/azbatch/package-info.java | 22 ++ .../driver/launch/local/LocalLauncher.java | 79 ++++++++ .../driver/launch/local/package-info.java | 22 ++ .../reef/bridge/driver/launch/package-info.java | 22 ++ .../bridge/driver/launch/yarn/YarnLauncher.java | 93 +++++++++ .../bridge/driver/launch/yarn/package-info.java | 22 ++ .../service/DriverServiceConfiguration.java | 79 ++++++++ .../DriverServiceConfigurationProviderBase.java | 148 -------------- .../driver/service/DriverStatusHTTPHandler.java | 200 +++++++++++++++++++ .../IDriverServiceConfigurationProvider.java | 7 +- .../driver/service/grpc/GRPCDriverService.java | 31 ++- .../GRPCDriverServiceConfigurationProvider.java | 127 +++++++++++- .../parameters/HTTPStatusAlarmInterval.java | 33 +++ .../parameters/HTTPStatusNumberOfRetries.java | 33 +++ .../reef/bridge/examples/hello/HelloREEF.java | 4 +- .../JobSubmissionDirectoryPrefix.java | 5 + pom.xml | 2 +- 27 files changed, 1090 insertions(+), 212 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/common/proto/bridge/ClientProtocol.proto ---------------------------------------------------------------------- diff --git a/lang/common/proto/bridge/ClientProtocol.proto b/lang/common/proto/bridge/ClientProtocol.proto index 3962c00..317f9d7 100644 --- a/lang/common/proto/bridge/ClientProtocol.proto +++ b/lang/common/proto/bridge/ClientProtocol.proto @@ -34,13 +34,37 @@ message LocalRuntimeParameters { message YarnRuntimeParameters { string queue = 1; - string job_submission_directory = 2; - string filesystem_url = 3; - bytes security_token = 4; + uint32 priority = 2; + bool unmanged_driver = 3; + + // providers + string filesystem_url = 5; + string job_submission_directory_prefix = 6; } message AzureBatchRuntimeParameters { + // Azure Batch Account Information + string azure_batch_account_name = 1; + + string azure_batch_account_key = 2; + + string azure_batch_account_uri = 3; + + string azure_batch_pool_id = 4; + + // Azure Storage Account Information + string azure_storage_account_name = 10; + string azure_storage_account_key = 11; + + string azure_storage_container_name = 12; + + // General information + enum OS { + WINDOWS = 0; + LINUX = 1; + } + OS operating_system = 20; } message MesosRuntimeParameters { @@ -54,12 +78,14 @@ message DriverClientConfiguration { uint32 cpu_cores = 2; uint32 memory_mb = 3; + string driver_job_submission_directory = 4; + // the runtime on which to launch oneof runtime { - LocalRuntimeParameters local_runtime = 4; - YarnRuntimeParameters yarn_runtime = 5; - AzureBatchRuntimeParameters azbatch_runtime = 6; - MesosRuntimeParameters mesos_runtime = 7; + LocalRuntimeParameters local_runtime = 5; + YarnRuntimeParameters yarn_runtime = 6; + AzureBatchRuntimeParameters azbatch_runtime = 7; + MesosRuntimeParameters mesos_runtime = 8; } // The command to launch the driver client @@ -82,4 +108,6 @@ message DriverClientConfiguration { repeated string global_libraries = 22; repeated string local_libraries = 23; + // enable http driver + bool enable_http_driver = 25; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/pom.xml b/lang/java/reef-bridge-proto-java/pom.xml index a1ccc33..467dd67 100644 --- a/lang/java/reef-bridge-proto-java/pom.xml +++ b/lang/java/reef-bridge-proto-java/pom.xml @@ -109,6 +109,21 @@ under the License. </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.reef</groupId> + <artifactId>reef-io</artifactId> + <version>0.17.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.reef</groupId> + <artifactId>reef-io</artifactId> + <version>0.17.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.reef</groupId> + <artifactId>reef-runtime-azbatch</artifactId> + <version>0.17.0-SNAPSHOT</version> + </dependency> </dependencies> <build> @@ -213,6 +228,16 @@ under the License. <shadedPattern>${project.groupId}.${project.artifactId}.shaded.protobuf</shadedPattern> </relocation> </relocations> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> </configuration> </execution> </executions> http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java index 0f3567c..f2dfddd 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/DriverServiceLauncher.java @@ -20,10 +20,10 @@ package org.apache.reef.bridge.client; import com.google.protobuf.util.JsonFormat; import org.apache.commons.lang.StringUtils; -import org.apache.reef.bridge.client.launch.LocalDriverServiceRuntimeLauncher; -import org.apache.reef.bridge.client.launch.YarnDriverServiceRuntimeLauncher; -import org.apache.reef.bridge.client.runtime.LocalDriverRuntimeConfigurationProvider; -import org.apache.reef.bridge.client.runtime.YarnDriverRuntimeConfigurationProvider; +import org.apache.reef.bridge.driver.launch.IDriverLauncher; +import org.apache.reef.bridge.driver.launch.azbatch.AzureBatchLauncher; +import org.apache.reef.bridge.driver.launch.local.LocalLauncher; +import org.apache.reef.bridge.driver.launch.yarn.YarnLauncher; import org.apache.reef.bridge.driver.service.IDriverServiceConfigurationProvider; import org.apache.reef.bridge.driver.service.grpc.GRPCDriverServiceConfigurationProvider; import org.apache.reef.bridge.driver.client.JavaDriverClientLauncher; @@ -87,11 +87,14 @@ public final class DriverServiceLauncher { driverClientConfigurationProto.getRuntimeCase() == ClientProtocol.DriverClientConfiguration.RuntimeCase.YARN_RUNTIME ? Tang.Factory.getTang().newConfigurationBuilder() + .bind(RuntimePathProvider.class, WindowsRuntimePathProvider.class) .bind(RuntimeClasspathProvider.class, YarnClasspathProvider.class) + .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, + YarnConfigurationConstructor.class) .build() : Tang.Factory.getTang().newConfigurationBuilder() - .bind(RuntimePathProvider.class, - OSUtils.isWindows() ? WindowsRuntimePathProvider.class : UnixJVMPathProvider.class) + .bind(RuntimePathProvider.class, OSUtils.isWindows() ? + WindowsRuntimePathProvider.class : UnixJVMPathProvider.class) .bind(RuntimeClasspathProvider.class, LocalClasspathProvider.class) .build(); final Injector runtimeInjector = Tang.Factory.getTang().newInjector(runtimeOSConfiguration); @@ -124,30 +127,34 @@ public final class DriverServiceLauncher { } } - private static IDriverServiceRuntimeLauncher getLocalDriverServiceLauncher() throws InjectionException { + private static IDriverLauncher getLocalDriverServiceLauncher() throws InjectionException { final Configuration localJobSubmissionClientConfig = Tang.Factory.getTang().newConfigurationBuilder() - .bindImplementation(IDriverRuntimeConfigurationProvider.class, - LocalDriverRuntimeConfigurationProvider.class) + .bindImplementation(IDriverLauncher.class, LocalLauncher.class) .bindImplementation(IDriverServiceConfigurationProvider.class, GRPCDriverServiceConfigurationProvider.class) - .bindImplementation(RuntimeClasspathProvider.class, LocalClasspathProvider.class) .build(); return Tang.Factory.getTang() - .newInjector(localJobSubmissionClientConfig).getInstance(LocalDriverServiceRuntimeLauncher.class); + .newInjector(localJobSubmissionClientConfig).getInstance(LocalLauncher.class); } - private static IDriverServiceRuntimeLauncher getYarnDriverServiceLauncher() throws InjectionException { + private static IDriverLauncher getYarnDriverServiceLauncher() throws InjectionException { final Configuration yarnJobSubmissionClientConfig = Tang.Factory.getTang().newConfigurationBuilder() - .bindImplementation(IDriverRuntimeConfigurationProvider.class, - YarnDriverRuntimeConfigurationProvider.class) + .bindImplementation(IDriverLauncher.class, YarnLauncher.class) .bindImplementation(IDriverServiceConfigurationProvider.class, GRPCDriverServiceConfigurationProvider.class) - .bindImplementation(RuntimeClasspathProvider.class, YarnClasspathProvider.class) - .bindConstructor(org.apache.hadoop.yarn.conf.YarnConfiguration.class, YarnConfigurationConstructor.class) .build(); return Tang.Factory.getTang() - .newInjector(yarnJobSubmissionClientConfig).getInstance(YarnDriverServiceRuntimeLauncher.class); + .newInjector(yarnJobSubmissionClientConfig).getInstance(YarnLauncher.class); + } + + private static IDriverLauncher getAzureBatchDriverServiceLauncher() throws InjectionException { + final Configuration azbatchJobSubmissionClientConfig = Tang.Factory.getTang().newConfigurationBuilder() + .bindImplementation(IDriverLauncher.class, AzureBatchLauncher.class) + .bindImplementation(IDriverServiceConfigurationProvider.class, + GRPCDriverServiceConfigurationProvider.class) + .build(); + return Tang.Factory.getTang().newInjector(azbatchJobSubmissionClientConfig).getInstance(AzureBatchLauncher.class); } /** @@ -176,13 +183,17 @@ public final class DriverServiceLauncher { driverClientConfigurationProtoBuilder.build(); switch (driverClientConfigurationProto.getRuntimeCase()) { case YARN_RUNTIME: - final IDriverServiceRuntimeLauncher yarnDriverServiceLauncher = getYarnDriverServiceLauncher(); + final IDriverLauncher yarnDriverServiceLauncher = getYarnDriverServiceLauncher(); yarnDriverServiceLauncher.launch(driverClientConfigurationProto); break; case LOCAL_RUNTIME: - final IDriverServiceRuntimeLauncher localDriverServiceLauncher = getLocalDriverServiceLauncher(); + final IDriverLauncher localDriverServiceLauncher = getLocalDriverServiceLauncher(); localDriverServiceLauncher.launch(driverClientConfigurationProto); break; + case AZBATCH_RUNTIME: + final IDriverLauncher azureBatchDriverServiceLauncher = getAzureBatchDriverServiceLauncher(); + azureBatchDriverServiceLauncher.launch(driverClientConfigurationProto); + break; default: } LOG.log(Level.INFO, "JavaBridge: Stop Client {0}", driverClientConfigurationProto.getJobid()); http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/LocalDriverServiceRuntimeLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/LocalDriverServiceRuntimeLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/LocalDriverServiceRuntimeLauncher.java index 9550a0c..44ac5ea 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/LocalDriverServiceRuntimeLauncher.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/LocalDriverServiceRuntimeLauncher.java @@ -47,9 +47,8 @@ public final class LocalDriverServiceRuntimeLauncher implements IDriverServiceRu @Override public void launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration) { try { - DriverLauncher.getLauncher( - driverRuntimeConfigurationProvider.getConfiguration(driverClientConfiguration)) - .run(driverServiceConfigurationProvider.getConfiguration(driverClientConfiguration)); + DriverLauncher.getLauncher(driverRuntimeConfigurationProvider.getConfiguration(driverClientConfiguration)) + .run(driverServiceConfigurationProvider.getDriverServiceConfiguration(driverClientConfiguration)); } catch (InjectionException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/YarnDriverServiceRuntimeLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/YarnDriverServiceRuntimeLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/YarnDriverServiceRuntimeLauncher.java index a2f4039..9ada0d1 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/YarnDriverServiceRuntimeLauncher.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/launch/YarnDriverServiceRuntimeLauncher.java @@ -24,15 +24,20 @@ import org.apache.reef.bridge.driver.service.IDriverServiceConfigurationProvider import org.apache.reef.bridge.client.IDriverServiceRuntimeLauncher; import org.apache.reef.bridge.proto.ClientProtocol; import org.apache.reef.client.DriverLauncher; +import org.apache.reef.client.LauncherStatus; import org.apache.reef.tang.exceptions.InjectionException; import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; /** * Yarn driver service launcher. */ public final class YarnDriverServiceRuntimeLauncher implements IDriverServiceRuntimeLauncher { + private static final Logger LOG = Logger.getLogger(YarnDriverServiceRuntimeLauncher.class.getName()); + private final IDriverRuntimeConfigurationProvider driverRuntimeConfigurationProvider; private final IDriverServiceConfigurationProvider driverServiceConfigurationProvider; @@ -48,9 +53,14 @@ public final class YarnDriverServiceRuntimeLauncher implements IDriverServiceRun @Override public void launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration) { try { - DriverLauncher.getLauncher( + final LauncherStatus status = DriverLauncher.getLauncher( driverRuntimeConfigurationProvider.getConfiguration(driverClientConfiguration)) - .run(driverServiceConfigurationProvider.getConfiguration(driverClientConfiguration)); + .run(driverServiceConfigurationProvider.getDriverServiceConfiguration(driverClientConfiguration)); + LOG.log(Level.INFO, "Job complete status: " + status.toString()); + if (status.getError().isPresent()) { + LOG.log(Level.SEVERE, status.getError().get().getMessage()); + status.getError().get().printStackTrace(); + } } catch (InjectionException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/runtime/YarnDriverRuntimeConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/runtime/YarnDriverRuntimeConfigurationProvider.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/runtime/YarnDriverRuntimeConfigurationProvider.java index ad7bd7d..016f885 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/runtime/YarnDriverRuntimeConfigurationProvider.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/client/runtime/YarnDriverRuntimeConfigurationProvider.java @@ -22,15 +22,12 @@ package org.apache.reef.bridge.client.runtime; import org.apache.commons.lang.StringUtils; import org.apache.reef.bridge.client.IDriverRuntimeConfigurationProvider; import org.apache.reef.bridge.proto.ClientProtocol; -import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier; -import org.apache.reef.runtime.yarn.driver.RuntimeIdentifier; -import org.apache.reef.runtime.yarn.driver.YarnDriverConfiguration; +import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; import org.apache.reef.runtime.yarn.driver.parameters.FileSystemUrl; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.Configurations; import org.apache.reef.tang.JavaConfigurationBuilder; import org.apache.reef.tang.Tang; -import org.apache.reef.tang.formats.ConfigurationModule; import javax.inject.Inject; @@ -45,22 +42,16 @@ public final class YarnDriverRuntimeConfigurationProvider implements IDriverRunt @Override public Configuration getConfiguration(final ClientProtocol.DriverClientConfiguration driverConfiguration) { - ConfigurationModule yarnDriverConfiguration = YarnDriverConfiguration.CONF - .set(YarnDriverConfiguration.JOB_IDENTIFIER, driverConfiguration.getJobid()) - .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, ClientRemoteIdentifier.NONE) - .set(YarnDriverConfiguration.JVM_HEAP_SLACK, 0.0) - .set(YarnDriverConfiguration.RUNTIME_NAMES, RuntimeIdentifier.RUNTIME_NAME); - if (StringUtils.isNotEmpty(driverConfiguration.getYarnRuntime().getJobSubmissionDirectory())) { - yarnDriverConfiguration = yarnDriverConfiguration - .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, - driverConfiguration.getYarnRuntime().getJobSubmissionDirectory()); - } + Configuration yarnConfiguration = YarnClientConfiguration.CONF + .set(YarnClientConfiguration.UNMANAGED_DRIVER, driverConfiguration.getYarnRuntime().getUnmangedDriver()) + .set(YarnClientConfiguration.YARN_PRIORITY, driverConfiguration.getYarnRuntime().getPriority()) + .set(YarnClientConfiguration.JVM_HEAP_SLACK, 0.0) + .build(); if (StringUtils.isNotEmpty(driverConfiguration.getYarnRuntime().getFilesystemUrl())) { - JavaConfigurationBuilder providerConfig = Tang.Factory.getTang().newConfigurationBuilder() + final JavaConfigurationBuilder providerConfig = Tang.Factory.getTang().newConfigurationBuilder() .bindNamedParameter(FileSystemUrl.class, driverConfiguration.getYarnRuntime().getFilesystemUrl()); - return Configurations.merge(yarnDriverConfiguration.build(), providerConfig.build()); - } else { - return yarnDriverConfiguration.build(); + yarnConfiguration = Configurations.merge(yarnConfiguration, providerConfig.build()); } + return yarnConfiguration; } } http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java index 12fd334..3ddeafb 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/grpc/DriverClientService.java @@ -163,6 +163,7 @@ public final class DriverClientService extends DriverClientGrpc.DriverClientImpl } finally { responseObserver.onNext(null); responseObserver.onCompleted(); + this.server.shutdown(); } } http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/IDriverLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/IDriverLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/IDriverLauncher.java new file mode 100644 index 0000000..c122552 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/IDriverLauncher.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.driver.launch; + +import org.apache.reef.bridge.proto.ClientProtocol; + +/** + * All driver launchers implement this method. + */ +public interface IDriverLauncher { + + /** + * Launch the driver with the dynamic {@link ClientProtocol.DriverClientConfiguration}. + * @param driverClientConfiguration dynamic driver configuration parameters + */ + void launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration); +} http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/AzureBatchDriverConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/AzureBatchDriverConfigurationProvider.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/AzureBatchDriverConfigurationProvider.java new file mode 100644 index 0000000..c4dd7f2 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/AzureBatchDriverConfigurationProvider.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.driver.launch.azbatch; + +import org.apache.reef.bridge.driver.service.IDriverServiceConfigurationProvider; +import org.apache.reef.bridge.proto.ClientProtocol; +import org.apache.reef.runtime.common.client.DriverConfigurationProvider; +import org.apache.reef.runtime.common.driver.parameters.ClientRemoteIdentifier; +import org.apache.reef.tang.Configuration; + +import javax.inject.Inject; +import java.io.File; +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * This is the Java Driver configuration generator for .NET Drivers that generates + * the Driver configuration at runtime. Called by {@link AzureBatchLauncher}. + */ +final class AzureBatchDriverConfigurationProvider { + + private static final Logger LOG = Logger.getLogger(AzureBatchDriverConfigurationProvider.class.getName()); + + // The driver service configuration provider + private final IDriverServiceConfigurationProvider driverServiceConfigurationProvider; + + // The driver runtime configuration provider + private final DriverConfigurationProvider driverConfigurationProvider; + + @Inject + private AzureBatchDriverConfigurationProvider( + final DriverConfigurationProvider driverConfigurationProvider, + final IDriverServiceConfigurationProvider driverServiceConfigurationProvider) { + this.driverConfigurationProvider = driverConfigurationProvider; + this.driverServiceConfigurationProvider = driverServiceConfigurationProvider; + } + + Configuration getDriverConfigurationFromParams( + final ClientProtocol.DriverClientConfiguration driverClientConfiguration) throws IOException { + + final String jobId = driverClientConfiguration.getJobid(); + final File jobFolder = new File(driverClientConfiguration.getDriverJobSubmissionDirectory()); + + LOG.log(Level.INFO, "jobFolder {0} jobId {1}.", new Object[]{jobFolder.toURI(), jobId}); + + return this.driverConfigurationProvider.getDriverConfiguration( + jobFolder.toURI(), ClientRemoteIdentifier.NONE, jobId, + driverServiceConfigurationProvider.getDriverServiceConfiguration(driverClientConfiguration)); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/AzureBatchLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/AzureBatchLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/AzureBatchLauncher.java new file mode 100644 index 0000000..6390035 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/AzureBatchLauncher.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.driver.launch.azbatch; + +import org.apache.reef.bridge.driver.launch.IDriverLauncher; +import org.apache.reef.bridge.proto.ClientProtocol; +import org.apache.reef.runtime.azbatch.client.AzureBatchRuntimeConfiguration; +import org.apache.reef.runtime.azbatch.client.AzureBatchRuntimeConfigurationCreator; +import org.apache.reef.runtime.common.REEFEnvironment; +import org.apache.reef.runtime.common.evaluator.PIDStoreStartHandler; +import org.apache.reef.runtime.common.launch.REEFErrorHandler; +import org.apache.reef.runtime.common.launch.REEFMessageCodec; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.wake.remote.RemoteConfiguration; +import org.apache.reef.wake.time.Clock; + +import javax.inject.Inject; +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * This is a bootstrap launcher for Azure Batch for submission from C#. It allows for Java Driver + * configuration generation directly on the Driver without need of Java dependency if REST + * submission is used. + */ +public final class AzureBatchLauncher implements IDriverLauncher { + + private static final Logger LOG = Logger.getLogger(AzureBatchLauncher.class.getName()); + private static final Tang TANG = Tang.Factory.getTang(); + + @Inject + private AzureBatchLauncher() { + } + + public void launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration) { + try { + final AzureBatchDriverConfigurationProvider azureBatchDriverConfigurationProvider = + TANG.newInjector(generateConfigurationFromJobSubmissionParameters(driverClientConfiguration)) + .getInstance(AzureBatchDriverConfigurationProvider.class); + + final Configuration launcherConfig = + TANG.newConfigurationBuilder() + .bindNamedParameter(RemoteConfiguration.ManagerName.class, "AzureBatchLauncher") + .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class) + .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class) + .bindSetEntry(Clock.RuntimeStartHandler.class, PIDStoreStartHandler.class) + .build(); + + try (final REEFEnvironment reef = REEFEnvironment.fromConfiguration( + azureBatchDriverConfigurationProvider.getDriverConfigurationFromParams(driverClientConfiguration), + launcherConfig)) { + reef.run(); + } catch (final InjectionException ex) { + throw fatal("Unable to configure and start REEFEnvironment.", ex); + } + } catch (InjectionException | IOException e) { + throw fatal("Unable to configure and start REEFEnvironment.", e); + } + + LOG.log(Level.INFO, "Exiting BootstrapLauncher.main()"); + + System.exit(0); // TODO[REEF-1715]: Should be able to exit cleanly at the end of main() + } + + private static Configuration generateConfigurationFromJobSubmissionParameters( + final ClientProtocol.DriverClientConfiguration driverClientConfiguration) { + return AzureBatchRuntimeConfigurationCreator.getOrCreateAzureBatchRuntimeConfiguration( + driverClientConfiguration.getAzbatchRuntime().getOperatingSystem() == + ClientProtocol.AzureBatchRuntimeParameters.OS.WINDOWS) + .set(AzureBatchRuntimeConfiguration.AZURE_BATCH_ACCOUNT_NAME, + driverClientConfiguration.getAzbatchRuntime().getAzureBatchAccountName()) + .set(AzureBatchRuntimeConfiguration.AZURE_BATCH_ACCOUNT_KEY, + driverClientConfiguration.getAzbatchRuntime().getAzureBatchAccountKey()) + .set(AzureBatchRuntimeConfiguration.AZURE_BATCH_ACCOUNT_URI, + driverClientConfiguration.getAzbatchRuntime().getAzureBatchAccountUri()) + .set(AzureBatchRuntimeConfiguration.AZURE_BATCH_POOL_ID, + driverClientConfiguration.getAzbatchRuntime().getAzureBatchPoolId()) + .set(AzureBatchRuntimeConfiguration.AZURE_STORAGE_ACCOUNT_NAME, + driverClientConfiguration.getAzbatchRuntime().getAzureStorageAccountName()) + .set(AzureBatchRuntimeConfiguration.AZURE_STORAGE_ACCOUNT_KEY, + driverClientConfiguration.getAzbatchRuntime().getAzureStorageAccountKey()) + .set(AzureBatchRuntimeConfiguration.AZURE_STORAGE_CONTAINER_NAME, + driverClientConfiguration.getAzbatchRuntime().getAzureStorageContainerName()) + .build(); + } + + private static RuntimeException fatal(final String msg, final Throwable t) { + LOG.log(Level.SEVERE, msg, t); + return new RuntimeException(msg, t); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/package-info.java new file mode 100644 index 0000000..8d1d237 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/azbatch/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * GRPC implementation for driver bridge service. + */ +package org.apache.reef.bridge.driver.launch.azbatch; http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/local/LocalLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/local/LocalLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/local/LocalLauncher.java new file mode 100644 index 0000000..153693c --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/local/LocalLauncher.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.driver.launch.local; + +import org.apache.commons.lang.StringUtils; +import org.apache.reef.bridge.driver.launch.IDriverLauncher; +import org.apache.reef.bridge.driver.service.IDriverServiceConfigurationProvider; +import org.apache.reef.bridge.proto.ClientProtocol; +import org.apache.reef.client.DriverLauncher; +import org.apache.reef.runtime.local.client.LocalRuntimeConfiguration; +import org.apache.reef.tang.exceptions.InjectionException; +import org.apache.reef.tang.formats.ConfigurationModule; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Submits a folder containing a Driver to the local runtime. + */ +public final class LocalLauncher implements IDriverLauncher { + + private static final Logger LOG = Logger.getLogger(LocalLauncher.class.getName()); + + private final IDriverServiceConfigurationProvider driverServiceConfigurationProvider; + + @Inject + private LocalLauncher(final IDriverServiceConfigurationProvider driverServiceConfigurationProvider) { + this.driverServiceConfigurationProvider = driverServiceConfigurationProvider; + } + + public void launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration) { + ConfigurationModule localRuntimeCM = LocalRuntimeConfiguration.CONF; + if (driverClientConfiguration.getLocalRuntime().getMaxNumberOfEvaluators() > 0) { + localRuntimeCM = localRuntimeCM.set(LocalRuntimeConfiguration.MAX_NUMBER_OF_EVALUATORS, + driverClientConfiguration.getLocalRuntime().getMaxNumberOfEvaluators()); + } + if (StringUtils.isNotEmpty(driverClientConfiguration.getLocalRuntime().getRuntimeRootFolder())) { + localRuntimeCM = localRuntimeCM.set(LocalRuntimeConfiguration.RUNTIME_ROOT_FOLDER, + driverClientConfiguration.getLocalRuntime().getRuntimeRootFolder()); + } + if (driverClientConfiguration.getLocalRuntime().getJvmHeapSlack() > 0.0) { + localRuntimeCM = localRuntimeCM.set(LocalRuntimeConfiguration.JVM_HEAP_SLACK, + driverClientConfiguration.getLocalRuntime().getJvmHeapSlack()); + } + if (StringUtils.isNotEmpty(driverClientConfiguration.getDriverJobSubmissionDirectory())) { + localRuntimeCM = localRuntimeCM.set(LocalRuntimeConfiguration.RUNTIME_ROOT_FOLDER, + driverClientConfiguration.getDriverJobSubmissionDirectory()); + } + try { + DriverLauncher + .getLauncher(localRuntimeCM.build()) + .run(driverServiceConfigurationProvider.getDriverServiceConfiguration(driverClientConfiguration)); + } catch (InjectionException e) { + throw fatal("local runtime", e); + } + } + + private static RuntimeException fatal(final String msg, final Throwable t) { + LOG.log(Level.SEVERE, msg, t); + return new RuntimeException(msg, t); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/local/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/local/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/local/package-info.java new file mode 100644 index 0000000..2ff6dcc --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/local/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * GRPC implementation for driver bridge service. + */ +package org.apache.reef.bridge.driver.launch.local; http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/package-info.java new file mode 100644 index 0000000..85e9825 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * GRPC implementation for driver bridge service. + */ +package org.apache.reef.bridge.driver.launch; http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/yarn/YarnLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/yarn/YarnLauncher.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/yarn/YarnLauncher.java new file mode 100644 index 0000000..21f3989 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/yarn/YarnLauncher.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.reef.bridge.driver.launch.yarn; + +import org.apache.commons.lang.StringUtils; +import org.apache.reef.bridge.driver.launch.IDriverLauncher; +import org.apache.reef.bridge.driver.service.IDriverServiceConfigurationProvider; +import org.apache.reef.bridge.driver.service.grpc.GRPCDriverServiceConfigurationProvider; +import org.apache.reef.bridge.proto.ClientProtocol; +import org.apache.reef.client.DriverLauncher; +import org.apache.reef.client.LauncherStatus; +import org.apache.reef.runtime.yarn.client.YarnClientConfiguration; +import org.apache.reef.runtime.yarn.driver.parameters.FileSystemUrl; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Configurations; +import org.apache.reef.tang.JavaConfigurationBuilder; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * This is a bootstrap launcher for YARN for submission from the bridge. It allows for Java Driver + * configuration generation directly on the Driver without need of Java dependency if REST + * submission is used. + */ +public final class YarnLauncher implements IDriverLauncher { + private static final Logger LOG = Logger.getLogger(YarnLauncher.class.getName()); + + @Inject + private YarnLauncher(){ + } + + public void launch(final ClientProtocol.DriverClientConfiguration driverClientConfiguration) { + try { + try { + final IDriverServiceConfigurationProvider driverConfigurationProvider = + Tang.Factory.getTang().newInjector(Tang.Factory.getTang().newConfigurationBuilder() + .bindImplementation(IDriverServiceConfigurationProvider.class, + GRPCDriverServiceConfigurationProvider.class) + .build() + ).getInstance(IDriverServiceConfigurationProvider.class); + Configuration yarnConfiguration = YarnClientConfiguration.CONF + .set(YarnClientConfiguration.UNMANAGED_DRIVER, + driverClientConfiguration.getYarnRuntime().getUnmangedDriver()) + .set(YarnClientConfiguration.YARN_PRIORITY, driverClientConfiguration.getYarnRuntime().getPriority()) + .set(YarnClientConfiguration.JVM_HEAP_SLACK, 0.0) + .build(); + if (StringUtils.isNotEmpty(driverClientConfiguration.getYarnRuntime().getFilesystemUrl())) { + final JavaConfigurationBuilder providerConfig = Tang.Factory.getTang().newConfigurationBuilder() + .bindNamedParameter(FileSystemUrl.class, driverClientConfiguration.getYarnRuntime().getFilesystemUrl()); + yarnConfiguration = Configurations.merge(yarnConfiguration, providerConfig.build()); + } + final LauncherStatus status = DriverLauncher.getLauncher(yarnConfiguration) + .run(driverConfigurationProvider.getDriverServiceConfiguration(driverClientConfiguration)); + LOG.log(Level.INFO, "Job complete status: " + status.toString()); + if (status.getError().isPresent()) { + LOG.log(Level.SEVERE, status.getError().get().getMessage()); + status.getError().get().printStackTrace(); + } + } catch (InjectionException e) { + throw new RuntimeException(e); + } + + } catch (final Exception e) { + throw fatal("Failed to initialize configurations.", e); + } + } + + private static RuntimeException fatal(final String msg, final Throwable t) { + LOG.log(Level.SEVERE, msg, t); + return new RuntimeException(msg, t); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/yarn/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/yarn/package-info.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/yarn/package-info.java new file mode 100644 index 0000000..fcbbdb7 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/launch/yarn/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * GRPC implementation for driver bridge service. + */ +package org.apache.reef.bridge.driver.launch.yarn; http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfiguration.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfiguration.java index e3656da..eebc51c 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfiguration.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfiguration.java @@ -20,11 +20,21 @@ package org.apache.reef.bridge.driver.service; import org.apache.reef.annotations.audience.Private; import org.apache.reef.bridge.service.parameters.DriverClientCommand; +import org.apache.reef.client.DriverConfiguration; +import org.apache.reef.client.DriverRestartConfiguration; import org.apache.reef.driver.parameters.DriverIdleSources; +import org.apache.reef.io.network.naming.NameServerConfiguration; +import org.apache.reef.runtime.common.driver.client.JobStatusHandler; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Configurations; +import org.apache.reef.tang.Tang; import org.apache.reef.tang.formats.ConfigurationModule; import org.apache.reef.tang.formats.ConfigurationModuleBuilder; import org.apache.reef.tang.formats.RequiredImpl; import org.apache.reef.tang.formats.RequiredParameter; +import org.apache.reef.webserver.HttpHandlerConfiguration; +import org.apache.reef.webserver.HttpServerReefEventHandler; +import org.apache.reef.webserver.ReefEventStateManager; /** * Binds all driver bridge service handlers to the driver. @@ -42,4 +52,73 @@ public final class DriverServiceConfiguration extends ConfigurationModuleBuilder .bindNamedParameter(DriverClientCommand.class, DRIVER_CLIENT_COMMAND) .bindSetEntry(DriverIdleSources.class, IDriverService.class) .build(); + + + public static final ConfigurationModule STATIC_DRIVER_CONF_MODULE = DriverConfiguration.CONF + .set(DriverConfiguration.ON_DRIVER_STARTED, DriverServiceHandlers.StartHandler.class) + .set(DriverConfiguration.ON_DRIVER_STOP, DriverServiceHandlers.StopHandler.class) + .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DriverServiceHandlers.AllocatedEvaluatorHandler.class) + .set(DriverConfiguration.ON_EVALUATOR_COMPLETED, DriverServiceHandlers.CompletedEvaluatorHandler.class) + .set(DriverConfiguration.ON_EVALUATOR_FAILED, DriverServiceHandlers.FailedEvaluatorHandler.class) + .set(DriverConfiguration.ON_CONTEXT_ACTIVE, DriverServiceHandlers.ActiveContextHandler.class) + .set(DriverConfiguration.ON_CONTEXT_CLOSED, DriverServiceHandlers.ClosedContextHandler.class) + .set(DriverConfiguration.ON_CONTEXT_FAILED, DriverServiceHandlers.ContextFailedHandler.class) + .set(DriverConfiguration.ON_CONTEXT_MESSAGE, DriverServiceHandlers.ContextMessageHandler.class) + .set(DriverConfiguration.ON_TASK_RUNNING, DriverServiceHandlers.RunningTaskHandler.class) + .set(DriverConfiguration.ON_TASK_COMPLETED, DriverServiceHandlers.CompletedTaskHandler.class) + .set(DriverConfiguration.ON_TASK_FAILED, DriverServiceHandlers.FailedTaskHandler.class) + .set(DriverConfiguration.ON_TASK_MESSAGE, DriverServiceHandlers.TaskMessageHandler.class) + .set(DriverConfiguration.ON_CLIENT_MESSAGE, DriverServiceHandlers.ClientMessageHandler.class) + .set(DriverConfiguration.ON_CLIENT_CLOSED, DriverServiceHandlers.ClientCloseHandler.class) + .set(DriverConfiguration.ON_CLIENT_CLOSED_MESSAGE, DriverServiceHandlers.ClientCloseWithMessageHandler.class); + + /** + * The HTTP Server configuration assumed by the bridge. + */ + private static final Configuration HTTP_SERVER_CONFIGURATION = Configurations.merge( + HttpHandlerConfiguration.CONF + .set(HttpHandlerConfiguration.HTTP_HANDLERS, HttpServerReefEventHandler.class) + // Add the http status handler. + .set(HttpHandlerConfiguration.HTTP_HANDLERS, DriverStatusHTTPHandler.class) + .build(), + org.apache.reef.client.DriverServiceConfiguration.CONF + .set(org.apache.reef.client.DriverServiceConfiguration.ON_EVALUATOR_ALLOCATED, + ReefEventStateManager.AllocatedEvaluatorStateHandler.class) + .set(org.apache.reef.client.DriverServiceConfiguration.ON_CONTEXT_ACTIVE, + ReefEventStateManager.ActiveContextStateHandler.class) + .set(org.apache.reef.client.DriverServiceConfiguration.ON_TASK_RUNNING, + ReefEventStateManager.TaskRunningStateHandler.class) + .set(org.apache.reef.client.DriverServiceConfiguration.ON_DRIVER_STARTED, + ReefEventStateManager.StartStateHandler.class) + .set(org.apache.reef.client.DriverServiceConfiguration.ON_DRIVER_STOP, + ReefEventStateManager.StopStateHandler.class) + .build(), + DriverRestartConfiguration.CONF + .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED, + ReefEventStateManager.DriverRestartHandler.class) + .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE, + ReefEventStateManager.DriverRestartActiveContextStateHandler.class) + .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING, + ReefEventStateManager.DriverRestartTaskRunningStateHandler.class) + .build(), + // Bind the HTTP handler for job status + Tang.Factory.getTang().newConfigurationBuilder() + .bindImplementation(JobStatusHandler.class, DriverStatusHTTPHandler.class) + .build() + ); + + /** + * The name server configuration assumed by the bridge. + */ + private static final Configuration NAME_SERVER_CONFIGURATION = NameServerConfiguration.CONF + .set(NameServerConfiguration.NAME_SERVICE_PORT, 0) + .build(); + + /** + * The driver configuration assumed by the the bridge. + */ + public static final Configuration HTTP_AND_NAMESERVER = Configurations.merge( + HTTP_SERVER_CONFIGURATION, + NAME_SERVER_CONFIGURATION + ); } http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfigurationProviderBase.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfigurationProviderBase.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfigurationProviderBase.java deleted file mode 100644 index 466c05f..0000000 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverServiceConfigurationProviderBase.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.reef.bridge.driver.service; - -import org.apache.reef.bridge.driver.service.grpc.GRPCDriverService; -import org.apache.reef.bridge.proto.ClientProtocol; -import org.apache.reef.client.DriverConfiguration; -import org.apache.reef.client.DriverRestartConfiguration; -import org.apache.reef.client.parameters.DriverConfigurationProviders; -import org.apache.reef.io.TcpPortConfigurationProvider; -import org.apache.reef.tang.Configuration; -import org.apache.reef.tang.JavaConfigurationBuilder; -import org.apache.reef.tang.Tang; -import org.apache.reef.tang.formats.ConfigurationModule; -import org.apache.reef.util.EnvironmentUtils; -import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin; -import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount; - -import java.util.ArrayList; -import java.util.List; - -/** - * Base class for all driver service configuration provider implementations. - */ -public abstract class DriverServiceConfigurationProviderBase implements IDriverServiceConfigurationProvider { - - private static final Tang TANG = Tang.Factory.getTang(); - - - protected Configuration getTcpPortRangeConfiguration( - final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto) { - JavaConfigurationBuilder configurationModuleBuilder = TANG.newConfigurationBuilder() - .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class); - // Setup TCP constraints - if (driverClientConfigurationProto.getTcpPortRangeBegin() > 0) { - configurationModuleBuilder = configurationModuleBuilder - .bindNamedParameter(TcpPortRangeBegin.class, - Integer.toString(driverClientConfigurationProto.getTcpPortRangeBegin())); - } - if (driverClientConfigurationProto.getTcpPortRangeCount() > 0) { - configurationModuleBuilder = configurationModuleBuilder - .bindNamedParameter(TcpPortRangeCount.class, - Integer.toString(driverClientConfigurationProto.getTcpPortRangeCount())); - } - if (driverClientConfigurationProto.getTcpPortRangeTryCount() > 0) { - configurationModuleBuilder = configurationModuleBuilder - .bindNamedParameter(TcpPortRangeCount.class, - Integer.toString(driverClientConfigurationProto.getTcpPortRangeTryCount())); - } - return configurationModuleBuilder.build(); - } - - protected Configuration getDriverConfiguration( - final ClientProtocol.DriverClientConfiguration driverConfiguration) { - ConfigurationModule driverServiceConfigurationModule = DriverConfiguration.CONF - .set(DriverConfiguration.DRIVER_IDENTIFIER, driverConfiguration.getJobid()); - - // Set file dependencies - final List<String> localLibraries = new ArrayList<>(); - localLibraries.add(EnvironmentUtils.getClassLocation(GRPCDriverService.class)); - if (driverConfiguration.getLocalLibrariesCount() > 0) { - localLibraries.addAll(driverConfiguration.getLocalLibrariesList()); - } - driverServiceConfigurationModule = driverServiceConfigurationModule - .setMultiple(DriverConfiguration.LOCAL_LIBRARIES, localLibraries); - if (driverConfiguration.getGlobalLibrariesCount() > 0) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .setMultiple(DriverConfiguration.GLOBAL_LIBRARIES, - driverConfiguration.getGlobalLibrariesList()); - } - if (driverConfiguration.getLocalFilesCount() > 0) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .setMultiple(DriverConfiguration.LOCAL_FILES, - driverConfiguration.getLocalFilesList()); - } - if (driverConfiguration.getGlobalFilesCount() > 0) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .setMultiple(DriverConfiguration.GLOBAL_FILES, - driverConfiguration.getGlobalFilesList()); - } - // Setup driver resources - if (driverConfiguration.getCpuCores() > 0) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .set(DriverConfiguration.DRIVER_CPU_CORES, driverConfiguration.getCpuCores()); - } - if (driverConfiguration.getMemoryMb() > 0) { - driverServiceConfigurationModule = driverServiceConfigurationModule - .set(DriverConfiguration.DRIVER_MEMORY, driverConfiguration.getMemoryMb()); - } - // Setup handlers - driverServiceConfigurationModule = driverServiceConfigurationModule - .set(DriverConfiguration.ON_DRIVER_STARTED, DriverServiceHandlers.StartHandler.class) - .set(DriverConfiguration.ON_DRIVER_STOP, DriverServiceHandlers.StopHandler.class) - .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, DriverServiceHandlers.AllocatedEvaluatorHandler.class) - .set(DriverConfiguration.ON_EVALUATOR_COMPLETED, DriverServiceHandlers.CompletedEvaluatorHandler.class) - .set(DriverConfiguration.ON_EVALUATOR_FAILED, DriverServiceHandlers.FailedEvaluatorHandler.class) - .set(DriverConfiguration.ON_CONTEXT_ACTIVE, DriverServiceHandlers.ActiveContextHandler.class) - .set(DriverConfiguration.ON_CONTEXT_CLOSED, DriverServiceHandlers.ClosedContextHandler.class) - .set(DriverConfiguration.ON_CONTEXT_FAILED, DriverServiceHandlers.ContextFailedHandler.class) - .set(DriverConfiguration.ON_CONTEXT_MESSAGE, DriverServiceHandlers.ContextMessageHandler.class) - .set(DriverConfiguration.ON_TASK_RUNNING, DriverServiceHandlers.RunningTaskHandler.class) - .set(DriverConfiguration.ON_TASK_COMPLETED, DriverServiceHandlers.CompletedTaskHandler.class) - .set(DriverConfiguration.ON_TASK_FAILED, DriverServiceHandlers.FailedTaskHandler.class) - .set(DriverConfiguration.ON_TASK_MESSAGE, DriverServiceHandlers.TaskMessageHandler.class) - .set(DriverConfiguration.ON_CLIENT_MESSAGE, DriverServiceHandlers.ClientMessageHandler.class) - .set(DriverConfiguration.ON_CLIENT_CLOSED, DriverServiceHandlers.ClientCloseHandler.class) - .set(DriverConfiguration.ON_CLIENT_CLOSED_MESSAGE, DriverServiceHandlers.ClientCloseWithMessageHandler.class); - return driverServiceConfigurationModule.build(); - } - - protected Configuration getDriverRestartConfiguration( - final ClientProtocol.DriverClientConfiguration driverConfiguration) { - ConfigurationModule restartConfModule = DriverRestartConfiguration.CONF - .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED, - DriverServiceHandlers.DriverRestartHandler.class) - .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE, - DriverServiceHandlers.DriverRestartActiveContextHandler.class) - .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING, - DriverServiceHandlers.DriverRestartRunningTaskHandler.class) - .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED, - DriverServiceHandlers.DriverRestartCompletedHandler.class) - .set(DriverRestartConfiguration.ON_DRIVER_RESTART_EVALUATOR_FAILED, - DriverServiceHandlers.DriverRestartFailedEvaluatorHandler.class); - return driverConfiguration.getDriverRestartEvaluatorRecoverySeconds() > 0 ? - restartConfModule - .set(DriverRestartConfiguration.DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS, - driverConfiguration.getDriverRestartEvaluatorRecoverySeconds()) - .build() : - restartConfModule.build(); - } -} http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverStatusHTTPHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverStatusHTTPHandler.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverStatusHTTPHandler.java new file mode 100644 index 0000000..c685ae7 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/DriverStatusHTTPHandler.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.driver.service; + +import org.apache.reef.bridge.driver.service.parameters.HTTPStatusAlarmInterval; +import org.apache.reef.bridge.driver.service.parameters.HTTPStatusNumberOfRetries; +import org.apache.reef.proto.ReefServiceProtos; +import org.apache.reef.runtime.common.driver.client.JobStatusHandler; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.Clock; +import org.apache.reef.wake.time.event.Alarm; +import org.apache.reef.webserver.HttpHandler; +import org.apache.reef.webserver.ParsedHttpRequest; + +import javax.inject.Inject; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.LinkedList; +import java.util.Queue; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Driver service status http handler. + */ +public final class DriverStatusHTTPHandler implements HttpHandler, JobStatusHandler { + + private static final Logger LOG = Logger.getLogger(DriverStatusHTTPHandler.class.getName()); + + /** + * The URI under which this handler answers. + */ + private String uriSpecification = "driverstatus"; + + /** + * A queue of messages to be sent to the client. + */ + private final Queue<ReefServiceProtos.JobStatusProto> statusMessagesToSend = new LinkedList<>(); + + /** + * The last status received by this object in its role as JobStatusHandler. + */ + private ReefServiceProtos.JobStatusProto lastStatus = null; + + /** + * The clock is used to schedule a check whether the handler has been called. + */ + private final Clock clock; + + /** + * The maximum number of times the AlarmHandler will be scheduled. + */ + private final int maxNumberOfRetries; + + /** + * The interval between alarms. + */ + private final int alarmInterval; + + /** + * The current retry. + */ + private int retry = 0; + + /** + * The alarm handler to keep the Clock alive until the status has been requested once. + */ + private final EventHandler<Alarm> alarmHandler = new EventHandler<Alarm>() { + @Override + public void onNext(final Alarm value) { + scheduleAlarm(); + } + }; + + /** + * Whether or not this handler was called at least once via HTTP. + */ + private boolean wasCalledViaHTTP = false; + + @Inject + DriverStatusHTTPHandler(final Clock clock, + @Parameter(HTTPStatusNumberOfRetries.class) final int maxNumberOfRetries, + @Parameter(HTTPStatusAlarmInterval.class) final int alarmInterval) { + this.clock = clock; + this.maxNumberOfRetries = maxNumberOfRetries; + this.alarmInterval = alarmInterval; + scheduleAlarm(); + } + + @Override + public String getUriSpecification() { + return uriSpecification; + } + + @Override + public void setUriSpecification(final String newUriSpecification) { + this.uriSpecification = newUriSpecification; + } + + @Override + public void onHttpRequest(final ParsedHttpRequest parsedHttpRequest, final HttpServletResponse response) + throws IOException, ServletException { + try (final PrintWriter writer = response.getWriter()) { + writer.write(waitAndGetMessage()); + this.wasCalledViaHTTP = true; + } + } + + @Override + public void onNext(final ReefServiceProtos.JobStatusProto value) { + LOG.log(Level.INFO, "Received status: {0}", value.getState().name()); + // Record the status received and notify the thread to send an answer. + synchronized (this) { + this.statusMessagesToSend.add(value); + this.lastStatus = value; + this.notifyAll(); + } + } + + @Override + public ReefServiceProtos.JobStatusProto getLastStatus() { + return this.lastStatus; + } + + @Override + public String toString() { + return "DriverStatusHTTPHandler{uriSpec=" + getUriSpecification() + "}"; + } + + /** + * Waits for a status message to be available and returns it. + * + * @return the first available status message. + */ + String waitAndGetMessage() { + synchronized (this) { + // Wait for a message to send. + while (this.statusMessagesToSend.isEmpty()) { + try { + this.wait(); + } catch (final InterruptedException e) { + LOG.log(Level.FINE, "Interrupted. Ignoring."); + } + } + + // Send the message + return getMessageForStatus(this.statusMessagesToSend.poll()); + } + } + + /** + * Generates a string to be sent to the client based on a + * {@link ReefServiceProtos.JobStatusProto}. + * + * @param status the status to be converted to String. + * @return the string to be sent back to the HTTP client. + */ + static String getMessageForStatus(final ReefServiceProtos.JobStatusProto status) { + return status.getState().name(); + } + + /** + * Schedules an alarm, if needed. + * <p> + * The alarm will prevent the Clock from going idle. This gives the .NET Client time to make a call to this HTTP + * handler. + */ + private void scheduleAlarm() { + if (wasCalledViaHTTP || retry >= maxNumberOfRetries) { + // No alarm necessary anymore. + LOG.log(Level.INFO, + "Not scheduling additional alarms after {0} out of max {1} retries. The HTTP handles was called: ", + new Object[] {retry, maxNumberOfRetries, wasCalledViaHTTP}); + return; + } + + // Scheduling an alarm will prevent the clock from going idle. + ++retry; + clock.scheduleAlarm(alarmInterval, alarmHandler); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/IDriverServiceConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/IDriverServiceConfigurationProvider.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/IDriverServiceConfigurationProvider.java index 8b8dea5..a369b61 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/IDriverServiceConfigurationProvider.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/IDriverServiceConfigurationProvider.java @@ -19,13 +19,16 @@ package org.apache.reef.bridge.driver.service; -import org.apache.reef.bridge.client.IDriverBridgeConfigurationProvider; import org.apache.reef.bridge.driver.service.grpc.GRPCDriverServiceConfigurationProvider; +import org.apache.reef.bridge.proto.ClientProtocol; +import org.apache.reef.tang.Configuration; import org.apache.reef.tang.annotations.DefaultImplementation; /** * Configuration provider for the driver service. */ @DefaultImplementation(GRPCDriverServiceConfigurationProvider.class) -public interface IDriverServiceConfigurationProvider extends IDriverBridgeConfigurationProvider { +public interface IDriverServiceConfigurationProvider { + + Configuration getDriverServiceConfiguration(final ClientProtocol.DriverClientConfiguration driverClientConfiguration); } http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java index ad7eaa8..86fbc1f 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverService.java @@ -52,6 +52,7 @@ import org.apache.reef.wake.time.event.StopTime; import javax.inject.Inject; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -130,11 +131,12 @@ public final class GRPCDriverService implements IDriverService { } else { final String cmd = this.driverClientCommand + " " + this.server.getPort(); final String cmdOs = OSUtils.isWindows() ? "cmd.exe /c \"" + cmd + "\"" : cmd; - final String cmdStd = cmdOs + " 1> driverclient.stdout 2> driverclient.stderr"; - this.driverProcess = Runtime.getRuntime().exec(cmdStd); + LOG.log(Level.INFO, "CMD: " + cmdOs); + this.driverProcess = Runtime.getRuntime().exec(cmdOs); synchronized (this) { // wait for driver client process to register while (this.clientStub == null && driverProcessIsAlive()) { + LOG.log(Level.INFO, "waiting for driver process to register"); this.wait(1000); // a second } } @@ -159,6 +161,7 @@ public final class GRPCDriverService implements IDriverService { } private void stop(final Throwable t) { + LOG.log(Level.INFO, "STOP: gRPC Driver Service", t); if (!stopped) { try { if (t != null) { @@ -172,16 +175,38 @@ public final class GRPCDriverService implements IDriverService { this.server = null; } if (this.driverProcess != null) { - LOG.log(Level.INFO, "Shutdown driver process"); + LOG.log(Level.INFO, "shutdown driver process"); + dump(); this.driverProcess.destroy(); this.driverProcess = null; } } finally { + LOG.log(Level.INFO, "COMPLETED STOP: gRPC Driver Service"); stopped = true; } } } + private void dump() { + if (!driverProcessIsAlive()) { + LOG.log(Level.INFO, "Exit code: " + this.driverProcess.exitValue()); + } + LOG.log(Level.INFO, "capturing driver process stderr"); + StringBuffer errBuffer = new StringBuffer(); + InputStream errStream = this.driverProcess.getErrorStream(); + try { + int nextChar; + errBuffer.append("\n==============================================\n"); + while ((nextChar = errStream.read()) != -1) { + errBuffer.append((char) nextChar); + } + errBuffer.append("\n==============================================\n"); + } catch (IOException e) { + LOG.log(Level.WARNING, "Error while capturing output stream: " + e.getMessage()); + } + LOG.log(Level.INFO, errBuffer.toString()); + } + /** * Await termination on the main thread since the grpc library uses daemon threads. */ http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverServiceConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverServiceConfigurationProvider.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverServiceConfigurationProvider.java index f0334c5..75ed5cc 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverServiceConfigurationProvider.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/grpc/GRPCDriverServiceConfigurationProvider.java @@ -19,19 +19,33 @@ package org.apache.reef.bridge.driver.service.grpc; +import org.apache.commons.lang.StringUtils; import org.apache.reef.bridge.driver.service.DriverServiceConfiguration; -import org.apache.reef.bridge.driver.service.DriverServiceConfigurationProviderBase; +import org.apache.reef.bridge.driver.service.DriverServiceHandlers; +import org.apache.reef.bridge.driver.service.IDriverServiceConfigurationProvider; import org.apache.reef.bridge.proto.ClientProtocol; +import org.apache.reef.client.DriverConfiguration; +import org.apache.reef.client.DriverRestartConfiguration; +import org.apache.reef.client.parameters.DriverConfigurationProviders; +import org.apache.reef.io.TcpPortConfigurationProvider; import org.apache.reef.tang.Configuration; import org.apache.reef.tang.Configurations; +import org.apache.reef.tang.JavaConfigurationBuilder; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.formats.ConfigurationModule; +import org.apache.reef.util.EnvironmentUtils; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin; +import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeCount; import javax.inject.Inject; +import java.util.ArrayList; +import java.util.List; import java.util.logging.Logger; /** * GRPC driver service configuration provider. */ -public final class GRPCDriverServiceConfigurationProvider extends DriverServiceConfigurationProviderBase { +public final class GRPCDriverServiceConfigurationProvider implements IDriverServiceConfigurationProvider { private static final Logger LOG = Logger.getLogger(GRPCDriverServiceConfigurationProvider.class.getName()); @@ -39,15 +53,114 @@ public final class GRPCDriverServiceConfigurationProvider extends DriverServiceC private GRPCDriverServiceConfigurationProvider() { } + @Override - public Configuration getConfiguration(final ClientProtocol.DriverClientConfiguration driverConfiguration) { + public Configuration getDriverServiceConfiguration( + final ClientProtocol.DriverClientConfiguration driverConfiguration) { Configuration driverServiceConfiguration = DriverServiceConfiguration.CONF .set(DriverServiceConfiguration.DRIVER_SERVICE_IMPL, GRPCDriverService.class) .set(DriverServiceConfiguration.DRIVER_CLIENT_COMMAND, driverConfiguration.getDriverClientLaunchCommand()) .build(); - return Configurations.merge( - driverServiceConfiguration, - getDriverConfiguration(driverConfiguration), - getTcpPortRangeConfiguration(driverConfiguration)); + return driverConfiguration.getDriverRestartEnable() ? + Configurations.merge( + driverServiceConfiguration, + getDriverRestartConfiguration(driverConfiguration), + getDriverConfiguration(driverConfiguration), + getTcpPortRangeConfiguration(driverConfiguration)) : + Configurations.merge( + driverServiceConfiguration, + getDriverConfiguration(driverConfiguration), + getTcpPortRangeConfiguration(driverConfiguration)); + } + + private Configuration getTcpPortRangeConfiguration( + final ClientProtocol.DriverClientConfiguration driverClientConfigurationProto) { + JavaConfigurationBuilder configurationModuleBuilder = Tang.Factory.getTang().newConfigurationBuilder() + .bindSetEntry(DriverConfigurationProviders.class, TcpPortConfigurationProvider.class); + // Setup TCP constraints + if (driverClientConfigurationProto.getTcpPortRangeBegin() > 0) { + configurationModuleBuilder = configurationModuleBuilder + .bindNamedParameter(TcpPortRangeBegin.class, + Integer.toString(driverClientConfigurationProto.getTcpPortRangeBegin())); + } + if (driverClientConfigurationProto.getTcpPortRangeCount() > 0) { + configurationModuleBuilder = configurationModuleBuilder + .bindNamedParameter(TcpPortRangeCount.class, + Integer.toString(driverClientConfigurationProto.getTcpPortRangeCount())); + } + if (driverClientConfigurationProto.getTcpPortRangeTryCount() > 0) { + configurationModuleBuilder = configurationModuleBuilder + .bindNamedParameter(TcpPortRangeCount.class, + Integer.toString(driverClientConfigurationProto.getTcpPortRangeTryCount())); + } + return configurationModuleBuilder.build(); + } + + private Configuration getDriverConfiguration( + final ClientProtocol.DriverClientConfiguration driverConfiguration) { + ConfigurationModule driverServiceConfigurationModule = DriverServiceConfiguration.STATIC_DRIVER_CONF_MODULE + .set(DriverConfiguration.DRIVER_IDENTIFIER, driverConfiguration.getJobid()); + + // Set file dependencies + final List<String> localLibraries = new ArrayList<>(); + localLibraries.add(EnvironmentUtils.getClassLocation(GRPCDriverService.class)); + if (driverConfiguration.getLocalLibrariesCount() > 0) { + localLibraries.addAll(driverConfiguration.getLocalLibrariesList()); + } + driverServiceConfigurationModule = driverServiceConfigurationModule + .setMultiple(DriverConfiguration.LOCAL_LIBRARIES, localLibraries); + if (driverConfiguration.getGlobalLibrariesCount() > 0) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .setMultiple(DriverConfiguration.GLOBAL_LIBRARIES, + driverConfiguration.getGlobalLibrariesList()); + } + if (driverConfiguration.getLocalFilesCount() > 0) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .setMultiple(DriverConfiguration.LOCAL_FILES, + driverConfiguration.getLocalFilesList()); + } + if (driverConfiguration.getGlobalFilesCount() > 0) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .setMultiple(DriverConfiguration.GLOBAL_FILES, + driverConfiguration.getGlobalFilesList()); + } + // Setup driver resources + if (driverConfiguration.getCpuCores() > 0) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.DRIVER_CPU_CORES, driverConfiguration.getCpuCores()); + } + if (driverConfiguration.getMemoryMb() > 0) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.DRIVER_MEMORY, driverConfiguration.getMemoryMb()); + } + // Job submission directory + if (StringUtils.isNotEmpty(driverConfiguration.getDriverJobSubmissionDirectory())) { + driverServiceConfigurationModule = driverServiceConfigurationModule + .set(DriverConfiguration.DRIVER_JOB_SUBMISSION_DIRECTORY, + driverConfiguration.getDriverJobSubmissionDirectory()); + } + return !driverConfiguration.getEnableHttpDriver() ? driverServiceConfigurationModule.build() : + Configurations.merge(DriverServiceConfiguration.HTTP_AND_NAMESERVER, driverServiceConfigurationModule.build()); + } + + private Configuration getDriverRestartConfiguration( + final ClientProtocol.DriverClientConfiguration driverConfiguration) { + ConfigurationModule restartConfModule = DriverRestartConfiguration.CONF + .set(DriverRestartConfiguration.ON_DRIVER_RESTARTED, + DriverServiceHandlers.DriverRestartHandler.class) + .set(DriverRestartConfiguration.ON_DRIVER_RESTART_CONTEXT_ACTIVE, + DriverServiceHandlers.DriverRestartActiveContextHandler.class) + .set(DriverRestartConfiguration.ON_DRIVER_RESTART_TASK_RUNNING, + DriverServiceHandlers.DriverRestartRunningTaskHandler.class) + .set(DriverRestartConfiguration.ON_DRIVER_RESTART_COMPLETED, + DriverServiceHandlers.DriverRestartCompletedHandler.class) + .set(DriverRestartConfiguration.ON_DRIVER_RESTART_EVALUATOR_FAILED, + DriverServiceHandlers.DriverRestartFailedEvaluatorHandler.class); + return driverConfiguration.getDriverRestartEvaluatorRecoverySeconds() > 0 ? + restartConfModule + .set(DriverRestartConfiguration.DRIVER_RESTART_EVALUATOR_RECOVERY_SECONDS, + driverConfiguration.getDriverRestartEvaluatorRecoverySeconds()) + .build() : + restartConfModule.build(); } } http://git-wip-us.apache.org/repos/asf/reef/blob/de1724c8/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/HTTPStatusAlarmInterval.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/HTTPStatusAlarmInterval.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/HTTPStatusAlarmInterval.java new file mode 100644 index 0000000..7c013b2 --- /dev/null +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/service/parameters/HTTPStatusAlarmInterval.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.bridge.driver.service.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * The interval between alarms in DriverStatusHTTPHandler. + */ +@NamedParameter(default_value = "200", doc = "The interval between alarms in DriverStatusHTTPHandler.") +public final class HTTPStatusAlarmInterval implements Name<Integer> { + + private HTTPStatusAlarmInterval() { + //intentionally empty + } +}
