http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimManager.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimManager.java new file mode 100644 index 0000000..96f247a --- /dev/null +++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimManager.java @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.azbatch.driver; + +import com.microsoft.azure.batch.protocol.models.CloudTask; +import com.microsoft.azure.batch.protocol.models.TaskState; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.driver.catalog.NodeDescriptor; +import org.apache.reef.proto.EvaluatorShimProtocol; +import org.apache.reef.runtime.azbatch.util.AzureBatchFileNames; +import org.apache.reef.runtime.azbatch.util.batch.AzureBatchHelper; +import org.apache.reef.runtime.azbatch.util.storage.AzureStorageClient; +import org.apache.reef.runtime.azbatch.util.command.CommandBuilder; +import org.apache.reef.runtime.azbatch.util.RemoteIdentifierParser; +import org.apache.reef.runtime.azbatch.util.TaskStatusMapper; +import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent; +import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent; +import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; +import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager; +import org.apache.reef.runtime.common.driver.evaluator.Evaluators; +import org.apache.reef.runtime.common.driver.evaluator.pojos.State; +import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceEventImpl; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl; +import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl; +import org.apache.reef.runtime.common.files.*; +import org.apache.reef.runtime.common.utils.RemoteManager; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.formats.ConfigurationSerializer; +import org.apache.reef.util.Optional; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.remote.RemoteMessage; +import org.apache.reef.wake.remote.impl.SocketRemoteIdentifier; + +import javax.inject.Inject; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static org.apache.reef.runtime.azbatch.driver.RuntimeIdentifier.RUNTIME_NAME; + +/** + * The Driver's view of evaluator shims running in the cluster. This class serves the following purposes: + * 1. listens for evaluator shim status messages signaling that the shim is online and ready to start the evaluator + * process. + * 2. listens for {@link ResourceLaunchEvent} events and sends commands to the evaluator shims to start the + * evaluator process. + * 3. listens for {@link ResourceReleaseEvent} events and sends terminate commands to the evaluator shims. + * 4. triggers {@link org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent} + * events to update REEF Common on runtime status. + * 5. triggers {@link org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent} + * events to update REEF Common on container statuses. + */ +@Private +@DriverSide +public final class AzureBatchEvaluatorShimManager + implements EventHandler<RemoteMessage<EvaluatorShimProtocol.EvaluatorShimStatusProto>> { + + private static final Logger LOG = Logger.getLogger(AzureBatchEvaluatorShimManager.class.getName()); + + private static final int EVALUATOR_SHIM_MEMORY_MB = 64; + + private final Map<String, ResourceRequestEvent> outstandingResourceRequests; + private final AtomicInteger outstandingResourceRequestCount; + + private final Map<String, CloudTask> failedResources; + + private final AutoCloseable evaluatorShimCommandChannel; + + private final AzureStorageClient azureStorageClient; + private final REEFFileNames reefFileNames; + private final AzureBatchFileNames azureBatchFileNames; + private final RemoteManager remoteManager; + private final AzureBatchHelper azureBatchHelper; + private final AzureBatchEvaluatorShimConfigurationProvider evaluatorShimConfigurationProvider; + private final JobJarMaker jobJarMaker; + private final CommandBuilder launchCommandBuilder; + private final REEFEventHandlers reefEventHandlers; + private final ConfigurationSerializer configurationSerializer; + + private final Evaluators evaluators; + + @Inject + AzureBatchEvaluatorShimManager( + final AzureStorageClient azureStorageClient, + final REEFFileNames reefFileNames, + final AzureBatchFileNames azureBatchFileNames, + final RemoteManager remoteManager, + final REEFEventHandlers reefEventHandlers, + final Evaluators evaluators, + final CommandBuilder launchCommandBuilder, + final AzureBatchHelper azureBatchHelper, + final JobJarMaker jobJarMaker, + final AzureBatchEvaluatorShimConfigurationProvider evaluatorShimConfigurationProvider, + final ConfigurationSerializer configurationSerializer) { + this.azureStorageClient = azureStorageClient; + this.reefFileNames = reefFileNames; + this.azureBatchFileNames = azureBatchFileNames; + this.remoteManager = remoteManager; + + this.reefEventHandlers = reefEventHandlers; + this.evaluators = evaluators; + + this.launchCommandBuilder = launchCommandBuilder; + + this.azureBatchHelper = azureBatchHelper; + this.jobJarMaker = jobJarMaker; + + this.evaluatorShimConfigurationProvider = evaluatorShimConfigurationProvider; + + this.outstandingResourceRequests = new ConcurrentHashMap<>(); + this.outstandingResourceRequestCount = new AtomicInteger(); + + this.failedResources = new ConcurrentHashMap<>(); + + this.evaluatorShimCommandChannel = remoteManager + .registerHandler(EvaluatorShimProtocol.EvaluatorShimStatusProto.class, this); + + this.configurationSerializer = configurationSerializer; + } + + /** + * This method is called when a resource is requested. It will add a task to the existing Azure Batch job which + * is equivalent to requesting a container in Azure Batch. When the request is fulfilled and the evaluator shim is + * started, it will send a message back to the driver which signals that a resource request was fulfilled. + * + * @param resourceRequestEvent resource request event. + * @param containerId container id for the resource. It will be used as the task id for Azure Batch task. + * @param jarFileUri Azure Storage SAS URI of the JAR file containing libraries required by the evaluator shim. + */ + public void onResourceRequested(final ResourceRequestEvent resourceRequestEvent, + final String containerId, + final URI jarFileUri) { + try { + createAzureBatchTask(containerId, jarFileUri); + this.outstandingResourceRequests.put(containerId, resourceRequestEvent); + this.outstandingResourceRequestCount.incrementAndGet(); + this.updateRuntimeStatus(); + } catch (IOException e) { + LOG.log(Level.SEVERE, "Failed to create Azure Batch task with the following exception: {0}", e); + throw new RuntimeException(e); + } + } + + /** + * This method is invoked by the RemoteManager when a message from the evaluator shim is received. + * + * @param statusMessage the message from the evaluator shim indicating that the shim has started and is ready to + * start the evaluator process. + */ + @Override + public void onNext(final RemoteMessage<EvaluatorShimProtocol.EvaluatorShimStatusProto> statusMessage) { + + EvaluatorShimProtocol.EvaluatorShimStatusProto message = statusMessage.getMessage(); + String containerId = message.getContainerId(); + String remoteId = message.getRemoteIdentifier(); + + LOG.log(Level.INFO, "Got a status message from evaluator shim = {0} with containerId = {1} and status = {2}.", + new String[]{remoteId, containerId, message.getStatus().toString()}); + + if (message.getStatus() != EvaluatorShimProtocol.EvaluatorShimStatus.ONLINE) { + LOG.log(Level.SEVERE, "Unexpected status returned from the evaluator shim: {0}. Ignoring the message.", + message.getStatus().toString()); + return; + } + + this.onResourceAllocated(containerId, remoteId, Optional.<CloudTask>empty()); + } + + /** + * This method is invoked when the Azure Batch runtime is notified that a pending resource request has been + * fulfilled. It could happen because of two reasons: + * 1. The driver receives a message from the evaluator shim indicating it has successfully started. + * 2. {@link AzureBatchTaskStatusAlarmHandler} detects that the evaluator shim failed before sending the status + * message. + * + * @param containerId id of the container. + * @param remoteId remote address for the allocated container. + * @param cloudTask Azure Batch task which corresponds to the container. + */ + public void onResourceAllocated(final String containerId, + final String remoteId, + final Optional<CloudTask> cloudTask) { + ResourceRequestEvent resourceRequestEvent = this.outstandingResourceRequests.remove(containerId); + + if (resourceRequestEvent == null) { + LOG.log(Level.WARNING, "No outstanding resource request found for container id = {0}.", containerId); + } else { + this.outstandingResourceRequestCount.decrementAndGet(); + + // We would expect the Azure Batch task to be in 'RUNNING' state. If it is in + // 'COMPLETED' state, it cannot receiver instructions and thus by definition + // has failed. + if (cloudTask.isPresent() && TaskState.COMPLETED.equals(cloudTask.get().state())) { + this.failedResources.put(containerId, cloudTask.get()); + } + + LOG.log(Level.FINEST, "Notifying REEF of a new node: {0}", remoteId); + this.reefEventHandlers.onNodeDescriptor(NodeDescriptorEventImpl.newBuilder() + .setIdentifier(RemoteIdentifierParser.parseNodeId(remoteId)) + .setHostName(RemoteIdentifierParser.parseIp(remoteId)) + .setPort(RemoteIdentifierParser.parsePort(remoteId)) + .setMemorySize(resourceRequestEvent.getMemorySize().get()) + .build()); + + LOG.log(Level.FINEST, "Triggering a new ResourceAllocationEvent for remoteId = {0}.", remoteId); + this.reefEventHandlers.onResourceAllocation( + ResourceEventImpl.newAllocationBuilder() + .setIdentifier(containerId) + .setNodeId(RemoteIdentifierParser.parseNodeId(remoteId)) + .setResourceMemory(resourceRequestEvent.getMemorySize().get()) + .setVirtualCores(resourceRequestEvent.getVirtualCores().get()) + .setRuntimeName(RuntimeIdentifier.RUNTIME_NAME) + .build()); + } + + this.updateRuntimeStatus(); + } + + /** + * Event handler method for {@link ResourceLaunchEvent}. This method will determine if the evaluator shim + * is online and send the evaluator launch command to the shim to start the evaluator. + * + * @param resourceLaunchEvent an instance of {@ResourceLaunchEvent} + * @param command OS command to launch the evaluator process. + * @param evaluatorConfigurationString evaluator configuration serialized as a String. + */ + public void onResourceLaunched(final ResourceLaunchEvent resourceLaunchEvent, + final String command, + final String evaluatorConfigurationString) { + final String resourceId = resourceLaunchEvent.getIdentifier(); + + if (this.failedResources.containsKey(resourceId)) { + LOG.log(Level.FINE, "ResourceLaunch event triggered on a failed container. " + + "Notifying REEF of failed container."); + CloudTask cloudTask = this.failedResources.get(resourceId); + this.onAzureBatchTaskStatus(cloudTask); + } else if (this.evaluators.get(resourceId).isPresent()) { + LOG.log(Level.FINE, "Preparing to launch resourceId = {0}", resourceId); + this.launchEvaluator(resourceLaunchEvent, command, evaluatorConfigurationString); + } else { + LOG.log(Level.WARNING, "Received a ResourceLaunch event for an unknown resourceId = {0}", resourceId); + } + + this.updateRuntimeStatus(); + } + + /** + * Event handler method for {@link ResourceReleaseEvent}. Sends a TERMINATE command to the appropriate evaluator shim. + * + * @param resourceReleaseEvent + */ + public void onResourceReleased(final ResourceReleaseEvent resourceReleaseEvent) { + String resourceRemoteId = getResourceRemoteId(resourceReleaseEvent.getIdentifier()); + + // REEF Common will trigger a ResourceReleaseEvent even if the resource has failed. Since we know that the shim + // has already failed, we can safely ignore this. + if (this.failedResources.remove(resourceReleaseEvent.getIdentifier()) != null) { + LOG.log(Level.INFO, "Received a ResourceReleaseEvent for a failed shim with resourceId = {0}. Ignoring.", + resourceReleaseEvent.getIdentifier()); + } else if (this.evaluators.get(resourceReleaseEvent.getIdentifier()).isPresent()) { + EventHandler<EvaluatorShimProtocol.EvaluatorShimControlProto> handler = + this.remoteManager.getHandler(resourceRemoteId, EvaluatorShimProtocol.EvaluatorShimControlProto.class); + + LOG.log(Level.INFO, "Sending TERMINATE command to the shim with remoteId = {0}.", resourceRemoteId); + handler.onNext( + EvaluatorShimProtocol.EvaluatorShimControlProto + .newBuilder() + .setCommand(EvaluatorShimProtocol.EvaluatorShimCommand.TERMINATE) + .build()); + + this.updateRuntimeStatus(); + } + } + + /** + * Takes in an instance of {@link CloudTask}, generates and triggers a + * {@link org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent}. + * + * @param cloudTask and instance of {@link CloudTask}. + */ + public void onAzureBatchTaskStatus(final CloudTask cloudTask) { + ResourceStatusEventImpl.Builder eventBuilder = + ResourceStatusEventImpl.newBuilder() + .setIdentifier(cloudTask.id()) + .setState(TaskStatusMapper.getReefTaskState(cloudTask)) + .setRuntimeName(RuntimeIdentifier.RUNTIME_NAME); + + if (TaskState.COMPLETED.equals(cloudTask.state())) { + eventBuilder.setExitCode(cloudTask.executionInfo().exitCode()); + } + + this.reefEventHandlers.onResourceStatus(eventBuilder.build()); + } + + /** + * Closes the evaluator shim remote manager command channel. + */ + public void onClose() { + try { + this.evaluatorShimCommandChannel.close(); + } catch (Exception e) { + LOG.log(Level.WARNING, "An unexpected exception while closing the Evaluator Shim Command channel: {0}", e); + throw new RuntimeException(e); + } + } + + /** + * A utility method which builds the evaluator shim JAR file and uploads it to Azure Storage. + * + * @return SAS URI to where the evaluator shim JAR was uploaded. + */ + public URI generateShimJarFile() { + + try { + Set<FileResource> globalFiles = new HashSet<>(); + + final File globalFolder = new File(this.reefFileNames.getGlobalFolderPath()); + final File[] filesInGlobalFolder = globalFolder.listFiles(); + + for (final File fileEntry : filesInGlobalFolder != null ? filesInGlobalFolder : new File[]{}) { + globalFiles.add(getFileResourceFromFile(fileEntry, FileType.LIB)); + } + + File jarFile = this.jobJarMaker.newBuilder() + .addGlobalFileSet(globalFiles) + .build(); + + return uploadFile(jarFile); + } catch (IOException ex) { + LOG.log(Level.SEVERE, "Failed to build JAR file", ex); + throw new RuntimeException(ex); + } + } + + private void updateRuntimeStatus() { + this.reefEventHandlers.onRuntimeStatus(RuntimeStatusEventImpl.newBuilder() + .setName(RUNTIME_NAME) + .setState(State.RUNNING) + .setOutstandingContainerRequests(this.outstandingResourceRequestCount.get()) + .build()); + } + + private void launchEvaluator(final ResourceLaunchEvent resourceLaunchEvent, + final String command, + final String evaluatorConfigurationString) { + String resourceId = resourceLaunchEvent.getIdentifier(); + String resourceRemoteId = getResourceRemoteId(resourceId); + + Set<FileResource> fileResources = resourceLaunchEvent.getFileSet(); + String fileUrl = ""; + if (!fileResources.isEmpty()) { + try { + File jarFile = writeFileResourcesJarFile(fileResources); + fileUrl = uploadFile(jarFile).toString(); + LOG.log(Level.FINE, "Uploaded evaluator file resources to Azure Storage at {0}.", fileUrl); + } catch (IOException e) { + LOG.log(Level.SEVERE, "Failed to generate zip archive for evaluator file resources: {0}.", e); + throw new RuntimeException(e); + } + } else { + LOG.log(Level.INFO, "No file resources found in ResourceLaunchEvent."); + } + + LOG.log(Level.INFO, "Sending a command to the Evaluator shim with remoteId = {0} to start the evaluator.", + resourceRemoteId); + EventHandler<EvaluatorShimProtocol.EvaluatorShimControlProto> handler = this.remoteManager + .getHandler(resourceRemoteId, EvaluatorShimProtocol.EvaluatorShimControlProto.class); + + handler.onNext( + EvaluatorShimProtocol.EvaluatorShimControlProto + .newBuilder() + .setCommand(EvaluatorShimProtocol.EvaluatorShimCommand.LAUNCH_EVALUATOR) + .setEvaluatorLaunchCommand(command) + .setEvaluatorConfigString(evaluatorConfigurationString) + .setEvaluatorFileResourcesUrl(fileUrl) + .build()); + } + + private String getEvaluatorShimLaunchCommand() { + return this.launchCommandBuilder.buildEvaluatorShimCommand(EVALUATOR_SHIM_MEMORY_MB, + this.azureBatchFileNames.getEvaluatorShimConfigurationPath()); + } + + /** + * @return The name under which the evaluator shim configuration will be stored in + * REEF_BASE_FOLDER/LOCAL_FOLDER. + */ + private FileResource getFileResourceFromFile(final File configFile, final FileType type) { + return FileResourceImpl.newBuilder() + .setName(configFile.getName()) + .setPath(configFile.getPath()) + .setType(type).build(); + } + + private void createAzureBatchTask(final String taskId, final URI jarFileUri) throws IOException { + final Configuration shimConfig = this.evaluatorShimConfigurationProvider.getConfiguration(taskId); + final File shim = new File(this.reefFileNames.getLocalFolderPath(), + taskId + '-' + this.azureBatchFileNames.getEvaluatorShimConfigurationName()); + this.configurationSerializer.toFile(shimConfig, shim); + final URI shimUri = this.uploadFile(shim); + this.azureBatchHelper.submitTask(this.azureBatchHelper.getAzureBatchJobId(), taskId, jarFileUri, + shimUri, getEvaluatorShimLaunchCommand()); + } + + private File writeFileResourcesJarFile(final Set<FileResource> fileResourceSet) throws IOException { + return this.jobJarMaker.newBuilder().addLocalFileSet(fileResourceSet).build(); + } + + private URI uploadFile(final File jarFile) throws IOException { + final String folderName = this.azureBatchFileNames.getStorageJobFolder(this.azureBatchHelper.getAzureBatchJobId()); + LOG.log(Level.FINE, "Uploading {0} to {1}.", new Object[]{jarFile.getAbsolutePath(), folderName}); + + return this.azureStorageClient.uploadFile(folderName, jarFile); + } + + private String getResourceRemoteId(final String resourceId) { + Optional<EvaluatorManager> optionalEvaluatorManager = this.evaluators.get(resourceId); + + if (!optionalEvaluatorManager.isPresent()) { + LOG.log(Level.SEVERE, "Unknown evaluator with resourceId = {0}", resourceId); + throw new RuntimeException("Unknown evaluator with resourceId = " + resourceId); + } + + NodeDescriptor nodeDescriptor = optionalEvaluatorManager.get().getEvaluatorDescriptor().getNodeDescriptor(); + return (new SocketRemoteIdentifier(nodeDescriptor.getInetSocketAddress())).toString(); + } +}
http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceLaunchHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceLaunchHandler.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceLaunchHandler.java new file mode 100644 index 0000000..b642c4d --- /dev/null +++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceLaunchHandler.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.azbatch.driver; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent; +import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A {@link ResourceLaunchHandler} for Azure Batch. + */ +@Private +@DriverSide +public final class AzureBatchResourceLaunchHandler implements ResourceLaunchHandler { + + private static final Logger LOG = Logger.getLogger(AzureBatchResourceLaunchHandler.class.getName()); + private final AzureBatchResourceManager azureBatchResourceManager; + + @Inject + AzureBatchResourceLaunchHandler(final AzureBatchResourceManager azureBatchResourceManager) { + this.azureBatchResourceManager = azureBatchResourceManager; + } + + /** + * This method is called when a new resource is requested. + * + * @param resourceLaunchEvent resource launch event. + */ + @Override + public void onNext(final ResourceLaunchEvent resourceLaunchEvent) { + LOG.log(Level.FINEST, "Got ResourceLaunchEvent in AzureBatchResourceLaunchHandler"); + this.azureBatchResourceManager.onResourceLaunched(resourceLaunchEvent); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManager.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManager.java new file mode 100644 index 0000000..8fbbf17 --- /dev/null +++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManager.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.azbatch.driver; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.runtime.azbatch.util.command.CommandBuilder; +import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent; +import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent; +import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; +import org.apache.reef.runtime.common.parameters.JVMHeapSlack; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.formats.ConfigurationSerializer; + +import javax.inject.Inject; +import java.net.URI; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * The Driver's view of all resources in Azure Batch pool. + */ +@Private +@DriverSide +public final class AzureBatchResourceManager { + private static final Logger LOG = Logger.getLogger(AzureBatchResourceManager.class.getName()); + + private final Map<String, ResourceRequestEvent> containerRequests; + private final AtomicInteger containerCount; + + private final ConfigurationSerializer configurationSerializer; + private final CommandBuilder launchCommandBuilder; + private final AzureBatchEvaluatorShimManager evaluatorShimManager; + private final AzureBatchTaskStatusAlarmHandler azureBatchTaskStatusAlarmHandler; + + private final double jvmHeapFactor; + + @Inject + AzureBatchResourceManager( + final ConfigurationSerializer configurationSerializer, + final CommandBuilder launchCommandBuilder, + final AzureBatchEvaluatorShimManager evaluatorShimManager, + final AzureBatchTaskStatusAlarmHandler azureBatchTaskStatusAlarmHandler, + @Parameter(JVMHeapSlack.class) final double jvmHeapSlack) { + this.configurationSerializer = configurationSerializer; + this.evaluatorShimManager = evaluatorShimManager; + this.jvmHeapFactor = 1.0 - jvmHeapSlack; + this.launchCommandBuilder = launchCommandBuilder; + this.containerRequests = new ConcurrentHashMap<>(); + this.containerCount = new AtomicInteger(0); + this.azureBatchTaskStatusAlarmHandler = azureBatchTaskStatusAlarmHandler; + } + + /** + * This method is invoked when a {@link ResourceRequestEvent} is triggered. + * + * @param resourceRequestEvent the resource request event. + */ + public void onResourceRequested(final ResourceRequestEvent resourceRequestEvent) { + LOG.log(Level.FINEST, "Got ResourceRequestEvent in AzureBatchResourceManager,"); + URI jarFileUri = this.evaluatorShimManager.generateShimJarFile(); + for (int r = 0; r < resourceRequestEvent.getResourceCount(); r++) { + final String containerId = generateContainerId(); + LOG.log(Level.FINE, "containerId in AzureBatchResourceManager {0}", containerId); + this.containerRequests.put(containerId, resourceRequestEvent); + this.containerCount.incrementAndGet(); + this.evaluatorShimManager.onResourceRequested(resourceRequestEvent, containerId, jarFileUri); + } + + int currentContainerCount = this.containerCount.get(); + if (currentContainerCount > 0) { + this.azureBatchTaskStatusAlarmHandler.enableAlarm(); + } + } + + /** + * This method is invoked when a {@link ResourceReleaseEvent} is triggered. + * + * @param resourceReleaseEvent the resource release event. + */ + public void onResourceReleased(final ResourceReleaseEvent resourceReleaseEvent) { + String id = resourceReleaseEvent.getIdentifier(); + LOG.log(Level.FINEST, "Got ResourceReleasedEvent for Id: {0} in AzureBatchResourceManager", id); + + ResourceRequestEvent removedEvent = this.containerRequests.remove(id); + if (removedEvent == null) { + LOG.log(Level.WARNING, + "Ignoring attempt to remove non-existent containerRequest for Id: {0} in AzureBatchResourceManager", id); + } else { + int currentContainerCount = this.containerCount.decrementAndGet(); + if (currentContainerCount <= 0) { + this.azureBatchTaskStatusAlarmHandler.disableAlarm(); + } + } + + this.evaluatorShimManager.onResourceReleased(resourceReleaseEvent); + } + + /** + * This method is called when the {@link ResourceLaunchEvent} is triggered. + * + * @param resourceLaunchEvent the resource launch event. + */ + public void onResourceLaunched(final ResourceLaunchEvent resourceLaunchEvent) { + String id = resourceLaunchEvent.getIdentifier(); + LOG.log(Level.FINEST, "Got ResourceLaunchEvent for Id: {0} in AzureBatchResourceManager", id); + final int evaluatorMemory = this.containerRequests.get(id).getMemorySize().get(); + String launchCommand = this.launchCommandBuilder.buildEvaluatorCommand(resourceLaunchEvent, + evaluatorMemory, this.jvmHeapFactor); + String evaluatorConfigurationString = this.configurationSerializer.toString(resourceLaunchEvent.getEvaluatorConf()); + this.evaluatorShimManager.onResourceLaunched(resourceLaunchEvent, launchCommand, evaluatorConfigurationString); + } + + private String generateContainerId() { + return UUID.randomUUID().toString(); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStartHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStartHandler.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStartHandler.java new file mode 100644 index 0000000..e07e91d --- /dev/null +++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStartHandler.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.azbatch.driver; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.runtime.common.driver.api.ResourceManagerStartHandler; +import org.apache.reef.wake.time.runtime.event.RuntimeStart; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A {@link ResourceManagerStartHandler} for Azure Batch runtime. + */ +@Private +@DriverSide +public final class AzureBatchResourceManagerStartHandler implements ResourceManagerStartHandler { + + private static final Logger LOG = Logger.getLogger(AzureBatchResourceManagerStartHandler.class.getName()); + + @Inject + AzureBatchResourceManagerStartHandler() { + } + + @Override + public void onNext(final RuntimeStart runtimeStart) { + LOG.log(Level.FINE, "Azure batch runtime has been started..."); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStopHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStopHandler.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStopHandler.java new file mode 100644 index 0000000..93ef918 --- /dev/null +++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceManagerStopHandler.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.azbatch.driver; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.runtime.common.driver.api.ResourceManagerStopHandler; +import org.apache.reef.wake.time.runtime.event.RuntimeStop; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A {@link ResourceManagerStopHandler} for Azure Batch runtime. + */ +@Private +@DriverSide +public final class AzureBatchResourceManagerStopHandler implements ResourceManagerStopHandler { + + private static final Logger LOG = Logger.getLogger(AzureBatchResourceManagerStopHandler.class.getName()); + + private final AzureBatchEvaluatorShimManager azureBatchEvaluatorShimManager; + + @Inject + AzureBatchResourceManagerStopHandler(final AzureBatchEvaluatorShimManager azureBatchEvaluatorShimManager) { + this.azureBatchEvaluatorShimManager = azureBatchEvaluatorShimManager; + } + + @Override + public void onNext(final RuntimeStop runtimeStop) { + LOG.log(Level.FINE, "Azure batch runtime has been stopped..."); + this.azureBatchEvaluatorShimManager.onClose(); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceReleaseHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceReleaseHandler.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceReleaseHandler.java new file mode 100644 index 0000000..e3da49b --- /dev/null +++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceReleaseHandler.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.azbatch.driver; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent; +import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A {@link ResourceReleaseHandler} for Azure Batch runtime. + */ +@Private +@DriverSide +public final class AzureBatchResourceReleaseHandler implements ResourceReleaseHandler { + + private static final Logger LOG = Logger.getLogger(AzureBatchResourceReleaseHandler.class.getName()); + + private final AzureBatchResourceManager azureBatchResourceManager; + + @Inject + AzureBatchResourceReleaseHandler(final AzureBatchResourceManager azureBatchResourceManager) { + this.azureBatchResourceManager = azureBatchResourceManager; + } + + @Override + public void onNext(final ResourceReleaseEvent resourceReleaseEvent) { + LOG.log(Level.FINEST, "Got ResourceReleaseEvent in AzureBatchResourceLaunchHandler"); + this.azureBatchResourceManager.onResourceReleased(resourceReleaseEvent); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceRequestHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceRequestHandler.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceRequestHandler.java new file mode 100644 index 0000000..fb9c000 --- /dev/null +++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchResourceRequestHandler.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.azbatch.driver; + +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; +import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler; + +import javax.inject.Inject; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A {@link ResourceRequestHandler} for Azure Batch runtime. + */ +@Private +@DriverSide +public final class AzureBatchResourceRequestHandler implements ResourceRequestHandler { + + private static final Logger LOG = Logger.getLogger(AzureBatchResourceRequestHandler.class.getName()); + private final AzureBatchResourceManager azureBatchResourceManager; + + @Inject + AzureBatchResourceRequestHandler( + final AzureBatchResourceManager azureBatchResourceManager) { + this.azureBatchResourceManager = azureBatchResourceManager; + } + + @Override + public void onNext(final ResourceRequestEvent resourceRequestEvent) { + LOG.log(Level.FINEST, "Got ResourceRequestEvent in AzureBatchResourceRequestHandler: memory = {0}, cores = {1}.", + new Object[]{resourceRequestEvent.getMemorySize(), resourceRequestEvent.getVirtualCores()}); + this.azureBatchResourceManager.onResourceRequested(resourceRequestEvent); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchTaskStatusAlarmHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchTaskStatusAlarmHandler.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchTaskStatusAlarmHandler.java new file mode 100644 index 0000000..0afe07d --- /dev/null +++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchTaskStatusAlarmHandler.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.azbatch.driver; + +import com.microsoft.azure.batch.protocol.models.CloudTask; +import com.microsoft.azure.batch.protocol.models.TaskState; +import org.apache.reef.annotations.audience.DriverSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.runtime.azbatch.parameters.AzureBatchTaskStatusCheckPeriod; +import org.apache.reef.runtime.azbatch.util.batch.AzureBatchHelper; +import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager; +import org.apache.reef.runtime.common.driver.evaluator.Evaluators; +import org.apache.reef.tang.InjectionFuture; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.util.Optional; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.time.Clock; +import org.apache.reef.wake.time.event.Alarm; + +import javax.inject.Inject; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Class that gets that status of the tasks from Azure Batch for the job that is currently in progress + * and notifies REEF of the status. + * Unlike YARN, Azure Batch does not support Resource Manager callbacks. Therefore, we must periodically call + * Azure Batch APIs to determine the status of tasks running inside our job. + */ +@Private +@DriverSide +final class AzureBatchTaskStatusAlarmHandler implements EventHandler<Alarm> { + + /** + * A placeholder remote ID which is used for reporting failed containers when the + * failure occurs before Wake communication channel can be established and the real + * remote ID is unknown. + */ + private static final String PLACEHOLDER_REMOTE_ID = "socket://0.0.0.0:0"; + + private final InjectionFuture<AzureBatchEvaluatorShimManager> evaluatorShimManager; + private final AzureBatchHelper azureBatchHelper; + private final int taskStatusCheckPeriod; + private boolean isAlarmEnabled; + private final Evaluators evaluators; + private final Clock clock; + + private static final Logger LOG = Logger.getLogger(AzureBatchTaskStatusAlarmHandler.class.getName()); + + @Inject + private AzureBatchTaskStatusAlarmHandler( + final InjectionFuture<AzureBatchEvaluatorShimManager> evaluatorShimManager, + final AzureBatchHelper azureBatchHelper, + final Evaluators evaluators, + final Clock clock, + @Parameter(AzureBatchTaskStatusCheckPeriod.class) final int taskStatusCheckPeriod) { + this.evaluatorShimManager = evaluatorShimManager; + this.azureBatchHelper = azureBatchHelper; + this.evaluators = evaluators; + this.clock = clock; + this.taskStatusCheckPeriod = taskStatusCheckPeriod; + } + + /** + * This method is periodically invoked by the Runtime Clock. It will call Azure Batch APIs to determine + * the status of tasks running inside the job and notify REEF of tasks statuses that correspond to running + * evaluators. + * + * @param alarm alarm object. + */ + @Override + public void onNext(final Alarm alarm) { + String jobId = this.azureBatchHelper.getAzureBatchJobId(); + List<CloudTask> allTasks = this.azureBatchHelper.getTaskStatusForJob(jobId); + + // Report status if the task has an associated active container. + LOG.log(Level.FINER, "Found {0} tasks from job id {1}", new Object[]{allTasks.size(), jobId}); + for (CloudTask task : allTasks) { + Optional<EvaluatorManager> optionalEvaluatorManager = this.evaluators.get(task.id()); + + if (!optionalEvaluatorManager.isPresent() && !TaskState.COMPLETED.equals(task.state())) { + // This usually means that the evaluator shim has started, but hasn't sent the status message + // back to the driver yet. + LOG.log(Level.FINE, "No Evaluator found for Azure Batch task id = {0}. Ignoring.", task.id()); + } else if (!optionalEvaluatorManager.isPresent() && TaskState.COMPLETED.equals(task.state())) { + // This indicates that the evaluator shim exited prematurely. We inform REEF of resource allocation + // so it's possible to trigger an event signaling resource failure later. + LOG.log(Level.INFO, "Azure Batch task id = {0} is in 'COMPLETED' state, but it does not have " + + "an Evaluator associated with it. This indicates that the evaluator shim has failed before " + + "it could send a callback to the driver.", task.id()); + this.evaluatorShimManager.get().onResourceAllocated(task.id(), PLACEHOLDER_REMOTE_ID, Optional.of(task)); + } else if (optionalEvaluatorManager.get().isClosedOrClosing()) { + LOG.log(Level.FINE, "Evaluator id = {0} is closed. Ignoring.", task.id()); + } else { + LOG.log(Level.FINE, "Reporting status for Task Id: {0} is [Azure Batch Status]:{1} ", + new Object[]{task.id(), task.state().toString()}); + this.evaluatorShimManager.get().onAzureBatchTaskStatus(task); + } + } + + synchronized (this) { + if (this.isAlarmEnabled()) { + this.scheduleAlarm(); + } + } + } + + /** + * Enable the period alarm to send status updates. + */ + public synchronized void enableAlarm() { + if (!this.isAlarmEnabled) { + LOG.log(Level.FINE, "Enabling the alarm and scheduling it to fire in {0} ms.", this.taskStatusCheckPeriod); + this.isAlarmEnabled = true; + this.scheduleAlarm(); + } else { + LOG.log(Level.FINE, "Alarm is already enabled."); + } + } + + /** + * Disable the period alarm to send status updates. + */ + public synchronized void disableAlarm() { + this.isAlarmEnabled = false; + } + + private boolean isAlarmEnabled() { + return this.isAlarmEnabled; + } + + private void scheduleAlarm() { + this.clock.scheduleAlarm(this.taskStatusCheckPeriod, this); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/REEFEventHandlers.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/REEFEventHandlers.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/REEFEventHandlers.java new file mode 100644 index 0000000..9db16ec --- /dev/null +++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/REEFEventHandlers.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.azbatch.driver; + +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.runtime.common.driver.api.RuntimeParameters; +import org.apache.reef.runtime.common.driver.resourcemanager.*; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.wake.EventHandler; + +import javax.inject.Inject; +import java.util.logging.Logger; + +/** + * Helper that represents the REEF layer to the Azure Batch runtime. + */ +@Private +public final class REEFEventHandlers { + private final EventHandler<ResourceAllocationEvent> resourceAllocationHandler; + private final EventHandler<ResourceStatusEvent> resourceStatusHandler; + private final EventHandler<RuntimeStatusEvent> runtimeStatusHandler; + private final EventHandler<NodeDescriptorEvent> nodeDescriptorEventHandler; + private static final Logger LOG = Logger.getLogger(REEFEventHandlers.class.getName()); + + @Inject + REEFEventHandlers(@Parameter(RuntimeParameters.NodeDescriptorHandler.class) + final EventHandler<NodeDescriptorEvent> nodeDescriptorEventHandler, + @Parameter(RuntimeParameters.RuntimeStatusHandler.class) + final EventHandler<RuntimeStatusEvent> runtimeStatusProtoEventHandler, + @Parameter(RuntimeParameters.ResourceAllocationHandler.class) + final EventHandler<ResourceAllocationEvent> resourceAllocationHandler, + @Parameter(RuntimeParameters.ResourceStatusHandler.class) + final EventHandler<ResourceStatusEvent> resourceStatusHandler) { + this.resourceAllocationHandler = resourceAllocationHandler; + this.resourceStatusHandler = resourceStatusHandler; + this.runtimeStatusHandler = runtimeStatusProtoEventHandler; + this.nodeDescriptorEventHandler = nodeDescriptorEventHandler; + } + + /** + * Inform reef of a node. + * + * @param nodeDescriptorEvent + */ + void onNodeDescriptor(final NodeDescriptorEvent nodeDescriptorEvent) { + this.nodeDescriptorEventHandler.onNext(nodeDescriptorEvent); + } + + /** + * Update REEF's view on the runtime status. + * + * @param runtimeStatusEvent + */ + @Private + public void onRuntimeStatus(final RuntimeStatusEvent runtimeStatusEvent) { + this.runtimeStatusHandler.onNext(runtimeStatusEvent); + } + + /** + * Inform REEF of a fresh resource allocation. + * + * @param resourceAllocationEvent + */ + @Private + public void onResourceAllocation(final ResourceAllocationEvent resourceAllocationEvent) { + this.resourceAllocationHandler.onNext(resourceAllocationEvent); + } + + /** + * Update REEF on a change to the status of a resource. + * + * @param resourceStatusEvent + */ + void onResourceStatus(final ResourceStatusEvent resourceStatusEvent) { + this.resourceStatusHandler.onNext(resourceStatusEvent); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/RuntimeIdentifier.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/RuntimeIdentifier.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/RuntimeIdentifier.java new file mode 100644 index 0000000..aa61db0 --- /dev/null +++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/RuntimeIdentifier.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.azbatch.driver; + +import org.apache.reef.annotations.audience.Private; + +/** + * Runtime Identifier Implementation. + */ +@Private +public final class RuntimeIdentifier { + + /** + * Same value is defined on the C# side in the Org.Apache.REEF.Common.Runtime.RuntimeName. + */ + public static final String RUNTIME_NAME = "AzBatch"; + + private RuntimeIdentifier() { + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/package-info.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/package-info.java new file mode 100644 index 0000000..37addfe --- /dev/null +++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Driver-side implementation of the REEF runtime for Azure Batch. + */ +package org.apache.reef.runtime.azbatch.driver; http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShim.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShim.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShim.java new file mode 100644 index 0000000..1579c26 --- /dev/null +++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShim.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.azbatch.evaluator; + +import com.microsoft.windowsazure.storage.StorageException; +import com.microsoft.windowsazure.storage.blob.CloudBlob; +import com.microsoft.windowsazure.storage.blob.CloudBlockBlob; +import org.apache.commons.lang.StringUtils; +import org.apache.reef.annotations.audience.EvaluatorSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.proto.EvaluatorShimProtocol; +import org.apache.reef.runtime.azbatch.parameters.ContainerIdentifier; +import org.apache.reef.runtime.azbatch.util.AzureBatchFileNames; +import org.apache.reef.runtime.common.evaluator.parameters.DriverRemoteIdentifier; +import org.apache.reef.runtime.common.files.REEFFileNames; +import org.apache.reef.runtime.common.utils.RemoteManager; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.exceptions.BindException; +import org.apache.reef.tang.formats.ConfigurationSerializer; +import org.apache.reef.wake.EventHandler; +import org.apache.reef.wake.remote.RemoteMessage; + +import javax.inject.Inject; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Enumeration; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; + +/** + * The evaluator shim acts as a wrapper process around the Evaluator. Azure Batch starts this process on the evaluator + * node at the time that the resource is allocated. Once started, the evaluator shim process will send a status + * message back to the the Driver which triggers a + * {@link org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent} on the Driver side. + * The evaluator shim will then wait for a command from the Driver to start the Evaluator process. + * Upon receiving the command, the shim will launch the evaluator process and wait for it to exit. After receiving + * a terminate command, the evaluator shim will exit thus releasing the resource and completing the Azure Batch task. + */ +@Private +@EvaluatorSide +public final class EvaluatorShim + implements EventHandler<RemoteMessage<EvaluatorShimProtocol.EvaluatorShimControlProto>> { + private static final Logger LOG = Logger.getLogger(EvaluatorShim.class.getName()); + + private final RemoteManager remoteManager; + private final REEFFileNames reefFileNames; + private final AzureBatchFileNames azureBatchFileNames; + private final ConfigurationSerializer configurationSerializer; + + private final String driverRemoteId; + private final String containerId; + + private final EventHandler<EvaluatorShimProtocol.EvaluatorShimStatusProto> evaluatorShimStatusChannel; + private final AutoCloseable evaluatorShimCommandChannel; + + private final ExecutorService threadPool; + + private Process evaluatorProcess; + private Integer evaluatorProcessExitValue; + + @Inject + EvaluatorShim(final REEFFileNames reefFileNames, + final AzureBatchFileNames azureBatchFileNames, + final ConfigurationSerializer configurationSerializer, + final RemoteManager remoteManager, + @Parameter(DriverRemoteIdentifier.class) final String driverRemoteId, + @Parameter(ContainerIdentifier.class) final String containerId) { + this.reefFileNames = reefFileNames; + this.azureBatchFileNames = azureBatchFileNames; + this.configurationSerializer = configurationSerializer; + + this.driverRemoteId = driverRemoteId; + this.containerId = containerId; + + this.remoteManager = remoteManager; + this.evaluatorShimStatusChannel = this.remoteManager.getHandler(this.driverRemoteId, + EvaluatorShimProtocol.EvaluatorShimStatusProto.class); + + this.evaluatorShimCommandChannel = this.remoteManager + .registerHandler(EvaluatorShimProtocol.EvaluatorShimControlProto.class, this); + + this.threadPool = Executors.newCachedThreadPool(); + } + + /** + * Starts the {@link EvaluatorShim}. + */ + public void run() { + LOG.log(Level.FINEST, "Entering EvaluatorShim.run()."); + this.onStart(); + } + + /** + * Stops the {@link EvaluatorShim}. + */ + public void stop() { + LOG.log(Level.FINEST, "Entering EvaluatorShim.stop()."); + this.onStop(); + } + + /** + * This method is invoked by the Remote Manager when a command message from the Driver is received. + * + * @param remoteMessage the message sent to the evaluator shim by the Driver. + */ + @Override + public void onNext(final RemoteMessage<EvaluatorShimProtocol.EvaluatorShimControlProto> remoteMessage) { + final EvaluatorShimProtocol.EvaluatorShimCommand command = remoteMessage.getMessage().getCommand(); + switch (command) { + case LAUNCH_EVALUATOR: + LOG.log(Level.INFO, "Received a command to launch the Evaluator."); + this.threadPool.submit(new Runnable() { + @Override + public void run() { + EvaluatorShim.this.onEvaluatorLaunch(remoteMessage.getMessage().getEvaluatorLaunchCommand(), + remoteMessage.getMessage().getEvaluatorConfigString(), + remoteMessage.getMessage().getEvaluatorFileResourcesUrl()); + } + }); + break; + + case TERMINATE: + LOG.log(Level.INFO, "Received a command to terminate the EvaluatorShim."); + this.threadPool.submit(new Runnable() { + @Override + public void run() { + EvaluatorShim.this.onStop(); + } + }); + break; + + default: + LOG.log(Level.WARNING, "An unknown command was received by the EvaluatorShim: {0}.", command); + throw new IllegalArgumentException("An unknown command was received by the EvaluatorShim."); + } + } + + private void onStart() { + LOG.log(Level.FINEST, "Entering EvaluatorShim.onStart()."); + + LOG.log(Level.INFO, "Reporting back to the driver with Shim Status = {0}", + EvaluatorShimProtocol.EvaluatorShimStatus.ONLINE); + this.evaluatorShimStatusChannel.onNext( + EvaluatorShimProtocol.EvaluatorShimStatusProto + .newBuilder() + .setRemoteIdentifier(this.remoteManager.getMyIdentifier()) + .setContainerId(this.containerId) + .setStatus(EvaluatorShimProtocol.EvaluatorShimStatus.ONLINE) + .build()); + + LOG.log(Level.FINEST, "Exiting EvaluatorShim.onStart()."); + } + + private void onStop() { + LOG.log(Level.FINEST, "Entering EvaluatorShim.onStop()."); + + try { + LOG.log(Level.INFO, "Closing EvaluatorShim Control channel."); + this.evaluatorShimCommandChannel.close(); + } catch (Exception e) { + LOG.log(Level.SEVERE, "An unexpected exception occurred while attempting to close the EvaluatorShim " + + "control channel."); + throw new RuntimeException(e); + } + + try { + LOG.log(Level.INFO, "Closing the Remote Manager."); + this.remoteManager.close(); + } catch (Exception e) { + LOG.log(Level.SEVERE, "Failed to close the RemoteManager with the following exception: {0}.", e); + throw new RuntimeException(e); + } + + LOG.log(Level.INFO, "Shutting down the thread pool."); + this.threadPool.shutdown(); + + LOG.log(Level.FINEST, "Exiting EvaluatorShim.onStop()."); + } + + private void onEvaluatorLaunch(final String launchCommand, final String evaluatorConfigString, + final String fileResourcesUrl) { + LOG.log(Level.FINEST, "Entering EvaluatorShim.onEvaluatorLaunch()."); + + if (StringUtils.isNotBlank(fileResourcesUrl)) { + LOG.log(Level.FINER, "Downloading evaluator resource file archive from {0}.", fileResourcesUrl); + try { + File tmpFile = downloadFile(fileResourcesUrl); + extractFiles(tmpFile); + } catch (StorageException | IOException e) { + LOG.log(Level.SEVERE, "Failed to download evaluator file resources: {0}. {1}", + new Object[]{fileResourcesUrl, e}); + throw new RuntimeException(e); + } + } else { + LOG.log(Level.FINER, "No file resources URL given."); + } + + File evaluatorConfigurationFile = new File(this.reefFileNames.getEvaluatorConfigurationPath()); + LOG.log(Level.FINER, "Persisting evaluator config at: {0}", evaluatorConfigurationFile.getAbsolutePath()); + + try { + boolean newFileCreated = evaluatorConfigurationFile.createNewFile(); + LOG.log(Level.FINEST, + newFileCreated ? "Created a new file for persisting evaluator configuration at {0}." + : "Using existing file for persisting evaluator configuration at {0}.", + evaluatorConfigurationFile.getAbsolutePath()); + + Configuration evaluatorConfiguration = this.configurationSerializer.fromString(evaluatorConfigString); + this.configurationSerializer.toFile(evaluatorConfiguration, evaluatorConfigurationFile); + } catch (final IOException | BindException e) { + LOG.log(Level.SEVERE, "An unexpected exception occurred while attempting to deserialize and write " + + "Evaluator configuration file. {0}", e); + throw new RuntimeException("Unable to write configuration.", e); + } + + LOG.log(Level.INFO, "Launching the evaluator by invoking the following command: " + launchCommand); + + try { + final List<String> command = Arrays.asList(launchCommand.split(" ")); + this.evaluatorProcess = new ProcessBuilder() + .command(command) + .redirectError(new File(this.azureBatchFileNames.getEvaluatorStdErrFilename())) + .redirectOutput(new File(this.azureBatchFileNames.getEvaluatorStdOutFilename())) + .start(); + + // This will block the current thread until the Evaluator process completes. + this.evaluatorProcessExitValue = EvaluatorShim.this.evaluatorProcess.waitFor(); + LOG.log(Level.INFO, "Evaluator process completed with exit value: {0}.", this.evaluatorProcessExitValue); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + + LOG.log(Level.FINEST, "Exiting EvaluatorShim.onEvaluatorLaunch()."); + } + + private File downloadFile(final String url) throws IOException, StorageException { + URI fileUri = URI.create(url); + File downloadedFile = new File(this.azureBatchFileNames.getEvaluatorResourceFilesJarName()); + LOG.log(Level.FINE, "Downloading evaluator file resources to {0}.", downloadedFile.getAbsolutePath()); + + try (FileOutputStream fileStream = new FileOutputStream(downloadedFile)) { + CloudBlob blob = new CloudBlockBlob(fileUri); + blob.download(fileStream); + } + + return downloadedFile; + } + + private void extractFiles(final File zipFile) throws IOException { + try (ZipFile zipFileHandle = new ZipFile(zipFile)) { + Enumeration<? extends ZipEntry> zipEntries = zipFileHandle.entries(); + while (zipEntries.hasMoreElements()) { + ZipEntry zipEntry = zipEntries.nextElement(); + File file = new File(this.reefFileNames.getREEFFolderName() + '/' + zipEntry.getName()); + if (file.exists()) { + LOG.log(Level.INFO, "Skipping entry {0} because the file already exists.", zipEntry.getName()); + } else { + if (zipEntry.isDirectory()) { + if (file.mkdirs()) { + LOG.log(Level.INFO, "Creating directory {0}.", zipEntry.getName()); + } else { + LOG.log(Level.INFO, "Directory {0} already exists. Ignoring.", zipEntry.getName()); + } + } else { + try (InputStream inputStream = zipFileHandle.getInputStream(zipEntry)) { + LOG.log(Level.INFO, "Extracting {0}.", zipEntry.getName()); + Files.copy(inputStream, Paths.get(this.reefFileNames.getREEFFolderName() + '/' + zipEntry.getName())); + LOG.log(Level.INFO, "Extracting {0} completed.", zipEntry.getName()); + } + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimConfiguration.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimConfiguration.java new file mode 100644 index 0000000..c114ba2 --- /dev/null +++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimConfiguration.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.azbatch.evaluator; + +import org.apache.reef.annotations.audience.EvaluatorSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.runtime.azbatch.parameters.ContainerIdentifier; +import org.apache.reef.runtime.common.evaluator.parameters.DriverRemoteIdentifier; +import org.apache.reef.runtime.common.launch.REEFMessageCodec; +import org.apache.reef.tang.formats.ConfigurationModule; +import org.apache.reef.tang.formats.ConfigurationModuleBuilder; +import org.apache.reef.tang.formats.RequiredParameter; +import org.apache.reef.wake.remote.RemoteConfiguration; + +/** + * ConfigurationModule to create evaluator shim configurations. + */ +@Private +@EvaluatorSide +public final class EvaluatorShimConfiguration extends ConfigurationModuleBuilder { + + /** + * @see org.apache.reef.runtime.common.evaluator.parameters.DriverRemoteIdentifier + */ + public static final RequiredParameter<String> DRIVER_REMOTE_IDENTIFIER = new RequiredParameter<>(); + + /** + * @see org.apache.reef.runtime.common.evaluator.parameters.DriverRemoteIdentifier + */ + public static final RequiredParameter<String> CONTAINER_IDENTIFIER = new RequiredParameter<>(); + + public static final ConfigurationModule CONF = new EvaluatorShimConfiguration() + .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class) + .bindNamedParameter(DriverRemoteIdentifier.class, DRIVER_REMOTE_IDENTIFIER) + .bindNamedParameter(ContainerIdentifier.class, CONTAINER_IDENTIFIER) + .build(); +} http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimLauncher.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimLauncher.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimLauncher.java new file mode 100644 index 0000000..bfc5c1d --- /dev/null +++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimLauncher.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.azbatch.evaluator; + +import org.apache.reef.annotations.audience.EvaluatorSide; +import org.apache.reef.annotations.audience.Private; +import org.apache.reef.runtime.azbatch.parameters.EvaluatorShimConfigFilePath; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.JavaConfigurationBuilder; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.annotations.Parameter; +import org.apache.reef.tang.formats.ConfigurationSerializer; + +import javax.inject.Inject; +import java.io.File; +import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * The main entry point into the {@link EvaluatorShim}. + */ +@Private +@EvaluatorSide +public final class EvaluatorShimLauncher { + + private static final Logger LOG = Logger.getLogger(EvaluatorShimLauncher.class.getName()); + + private final String configurationFilePath; + private final ConfigurationSerializer configurationSerializer; + + @Inject + EvaluatorShimLauncher( + @Parameter(EvaluatorShimConfigFilePath.class) final String configurationFilePath, + final ConfigurationSerializer configurationSerializer) { + this.configurationFilePath = configurationFilePath; + this.configurationSerializer = configurationSerializer; + } + + /** + * Launch the {@link EvaluatorShim}. + * @throws Exception + */ + public void launch() throws Exception { + final Injector injector = Tang.Factory.getTang().newInjector(readConfigurationFromDisk(this.configurationFilePath)); + final EvaluatorShim evaluatorShim = injector.getInstance(EvaluatorShim.class); + evaluatorShim.run(); + } + + private Configuration readConfigurationFromDisk(final String configPath) { + + LOG.log(Level.FINER, "Loading configuration file: {0}", configPath); + + final File shimConfigurationFile = new File(configPath); + + if (!shimConfigurationFile.exists() || !shimConfigurationFile.canRead()) { + throw new RuntimeException( + "Configuration file " + configPath + " doesn't exist or is not readable.", + new IOException(configPath)); + } + + try { + final Configuration config = this.configurationSerializer.fromFile(shimConfigurationFile); + LOG.log(Level.FINEST, "Configuration file loaded: {0}", configPath); + return config; + } catch (final IOException e) { + throw new RuntimeException("Unable to parse the configuration file: " + configPath, e); + } + } + + private static Configuration parseCommandLine(final String[] args) { + if (args.length != 1) { + throw new RuntimeException("Expected configuration file name as an argument."); + } + + final JavaConfigurationBuilder confBuilder = Tang.Factory.getTang().newConfigurationBuilder(); + confBuilder.bindNamedParameter(EvaluatorShimConfigFilePath.class, args[0]); + + return confBuilder.build(); + } + + /** + * The starting point of the evaluator shim launcher. + */ + public static void main(final String[] args) throws Exception { + LOG.log(Level.INFO, "Entering EvaluatorShimLauncher.main()."); + + final Injector injector = Tang.Factory.getTang().newInjector(parseCommandLine(args)); + final EvaluatorShimLauncher launcher = injector.getInstance(EvaluatorShimLauncher.class); + launcher.launch(); + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/package-info.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/package-info.java new file mode 100644 index 0000000..cfa7b1b --- /dev/null +++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Client for the REEF runtime for Azure Batch. + */ +package org.apache.reef.runtime.azbatch.evaluator; http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/package-info.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/package-info.java new file mode 100644 index 0000000..e10dbd6 --- /dev/null +++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * A REEF runtime for Azure Batch. + */ +package org.apache.reef.runtime.azbatch; http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountKey.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountKey.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountKey.java new file mode 100644 index 0000000..2efe842 --- /dev/null +++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountKey.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.azbatch.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * The Azure Batch account key. + */ +@NamedParameter(doc = "The Azure Batch account key.") +public final class AzureBatchAccountKey implements Name<String> { +} http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountName.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountName.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountName.java new file mode 100644 index 0000000..09f49d7 --- /dev/null +++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountName.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.azbatch.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * The Azure Batch account name. + */ +@NamedParameter(doc = "The Azure Batch account name.") +public final class AzureBatchAccountName implements Name<String> { +} http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountUri.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountUri.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountUri.java new file mode 100644 index 0000000..1f2ec54 --- /dev/null +++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchAccountUri.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.azbatch.parameters; + +import org.apache.reef.tang.annotations.NamedParameter; +import org.apache.reef.tang.annotations.Name; + +/** + * The Azure Batch account URI. + */ +@NamedParameter(doc = "The Azure Batch account URI.") +public final class AzureBatchAccountUri implements Name<String> { +} http://git-wip-us.apache.org/repos/asf/reef/blob/561a336f/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchPoolId.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchPoolId.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchPoolId.java new file mode 100644 index 0000000..88137f8 --- /dev/null +++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/AzureBatchPoolId.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.runtime.azbatch.parameters; + +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; + +/** + * The Azure Batch pool ID. + */ +@NamedParameter(doc = "The Azure Batch pool ID.") +public final class AzureBatchPoolId implements Name<String> { +}
