Repository: incubator-reef Updated Branches: refs/heads/master 7396f314e -> 089be44d5
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestQueueTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestQueueTest.java b/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestQueueTest.java index 658747d..8fb1872 100644 --- a/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestQueueTest.java +++ b/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestQueueTest.java @@ -18,7 +18,7 @@ */ package org.apache.reef.runtime.local.driver; -import org.apache.reef.proto.DriverRuntimeProtocol; +import org.apache.reef.runtime.common.driver.api.ResourceRequestEventImpl; import org.junit.Assert; import org.junit.Test; @@ -58,6 +58,6 @@ public class ResourceRequestQueueTest { } private ResourceRequest getAlmostSatisfied() { - return new ResourceRequest(DriverRuntimeProtocol.ResourceRequestProto.newBuilder().setResourceCount(1).setMemorySize(128).build()); + return new ResourceRequest(ResourceRequestEventImpl.newBuilder().setResourceCount(1).setMemorySize(128).build()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestTest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestTest.java b/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestTest.java index 3156a02..8eb310a 100644 --- a/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestTest.java +++ b/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceRequestTest.java @@ -18,7 +18,7 @@ */ package org.apache.reef.runtime.local.driver; -import org.apache.reef.proto.DriverRuntimeProtocol; +import org.apache.reef.runtime.common.driver.api.ResourceRequestEventImpl; import org.junit.Assert; import org.junit.Test; @@ -56,6 +56,6 @@ public final class ResourceRequestTest { } private ResourceRequest get(final int n) { - return new ResourceRequest(DriverRuntimeProtocol.ResourceRequestProto.newBuilder().setResourceCount(n).setMemorySize(128).build()); + return new ResourceRequest(ResourceRequestEventImpl.newBuilder().setResourceCount(n).setMemorySize(128).build()); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java index 29e1c21..c267373 100644 --- a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java +++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/client/MesosJobSubmissionHandler.java @@ -20,10 +20,10 @@ package org.apache.reef.runtime.mesos.client; import org.apache.reef.annotations.audience.ClientSide; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.proto.ClientRuntimeProtocol; -import org.apache.reef.proto.ReefServiceProtos.FileResourceProto; +import org.apache.reef.runtime.common.client.api.JobSubmissionEvent; import org.apache.reef.runtime.common.client.api.JobSubmissionHandler; import org.apache.reef.runtime.common.files.ClasspathProvider; +import org.apache.reef.runtime.common.files.FileResource; import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder; import org.apache.reef.runtime.common.parameters.JVMHeapSlack; @@ -78,10 +78,10 @@ final class MesosJobSubmissionHandler implements JobSubmissionHandler { } @Override - public void onNext(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) { + public void onNext(final JobSubmissionEvent jobSubmissionEvent) { try { final File jobFolder = new File(new File(this.rootFolderName), - "/" + jobSubmissionProto.getIdentifier() + "-" + System.currentTimeMillis() + "/"); + "/" + jobSubmissionEvent.getIdentifier() + "-" + System.currentTimeMillis() + "/"); final File driverFolder = new File(jobFolder, DRIVER_FOLDER_NAME); driverFolder.mkdirs(); @@ -91,7 +91,7 @@ final class MesosJobSubmissionHandler implements JobSubmissionHandler { final File localFolder = new File(reefFolder, this.fileNames.getLocalFolderName()); localFolder.mkdirs(); - for (final FileResourceProto file : jobSubmissionProto.getLocalFileList()) { + for (final FileResource file : jobSubmissionEvent.getLocalFileSet()) { final Path src = new File(file.getPath()).toPath(); final Path dst = new File(driverFolder, this.fileNames.getLocalFolderPath() + "/" + file.getName()).toPath(); Files.copy(src, dst, java.nio.file.StandardCopyOption.REPLACE_EXISTING); @@ -99,7 +99,7 @@ final class MesosJobSubmissionHandler implements JobSubmissionHandler { final File globalFolder = new File(reefFolder, this.fileNames.getGlobalFolderName()); globalFolder.mkdirs(); - for (final FileResourceProto file : jobSubmissionProto.getGlobalFileList()) { + for (final FileResource file : jobSubmissionEvent.getGlobalFileSet()) { final Path src = new File(file.getPath()).toPath(); final Path dst = new File(driverFolder, this.fileNames.getGlobalFolderPath() + "/" + file.getName()).toPath(); Files.copy(src, dst, java.nio.file.StandardCopyOption.REPLACE_EXISTING); @@ -108,21 +108,21 @@ final class MesosJobSubmissionHandler implements JobSubmissionHandler { final Configuration driverConfiguration = Configurations.merge(MesosDriverConfiguration.CONF .set(MesosDriverConfiguration.MESOS_MASTER_IP, this.masterIp) - .set(MesosDriverConfiguration.JOB_IDENTIFIER, jobSubmissionProto.getIdentifier()) - .set(MesosDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, jobSubmissionProto.getRemoteId()) + .set(MesosDriverConfiguration.JOB_IDENTIFIER, jobSubmissionEvent.getIdentifier()) + .set(MesosDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, jobSubmissionEvent.getRemoteId()) .set(MesosDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack) .set(MesosDriverConfiguration.SCHEDULER_DRIVER_CAPACITY, 1) // must be 1 as there is 1 scheduler at the same time .build(), - this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration())); + jobSubmissionEvent.getConfiguration()); final File runtimeConfigurationFile = new File(driverFolder, this.fileNames.getDriverConfigurationPath()); this.configurationSerializer.toFile(driverConfiguration, runtimeConfigurationFile); final List<String> launchCommand = new JavaLaunchCommandBuilder() - .setErrorHandlerRID(jobSubmissionProto.getRemoteId()) - .setLaunchID(jobSubmissionProto.getIdentifier()) + .setErrorHandlerRID(jobSubmissionEvent.getRemoteId()) + .setLaunchID(jobSubmissionEvent.getIdentifier()) .setConfigurationFileName(this.fileNames.getDriverConfigurationPath()) .setClassPath(this.classpath.getDriverClasspath()) - .setMemory(jobSubmissionProto.getDriverMemory()) + .setMemory(jobSubmissionEvent.getDriverMemory().get()) .build(); final File errFile = new File(driverFolder, fileNames.getDriverStderrFileName()); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java index 7ce98f8..b239b80 100644 --- a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java +++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceLaunchHandler.java @@ -22,7 +22,7 @@ import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.io.TempFileCreator; import org.apache.reef.io.WorkingDirectoryTempFileCreator; -import org.apache.reef.proto.DriverRuntimeProtocol; +import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent; import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler; import org.apache.reef.runtime.common.files.ClasspathProvider; import org.apache.reef.runtime.common.files.JobJarMaker; @@ -78,15 +78,15 @@ final class MesosResourceLaunchHandler implements ResourceLaunchHandler { @Override - public void onNext(final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto) { + public void onNext(final ResourceLaunchEvent resourceLaunchEvent) { try { - LOG.log(Level.INFO, "resourceLaunchProto. {0}", resourceLaunchProto.toString()); + LOG.log(Level.INFO, "resourceLaunch. {0}", resourceLaunchEvent.toString()); final File localStagingFolder = Files.createTempDirectory(this.fileNames.getEvaluatorFolderPrefix()).toFile(); final Configuration evaluatorConfiguration = Tang.Factory.getTang() - .newConfigurationBuilder(this.configurationSerializer.fromString(resourceLaunchProto.getEvaluatorConf())) + .newConfigurationBuilder(resourceLaunchEvent.getEvaluatorConf()) .bindImplementation(TempFileCreator.class, WorkingDirectoryTempFileCreator.class) .build(); @@ -94,15 +94,15 @@ final class MesosResourceLaunchHandler implements ResourceLaunchHandler { localStagingFolder, this.fileNames.getEvaluatorConfigurationName()); this.configurationSerializer.toFile(evaluatorConfiguration, configurationFile); - JobJarMaker.copy(resourceLaunchProto.getFileList(), localStagingFolder); + JobJarMaker.copy(resourceLaunchEvent.getFileSet(), localStagingFolder); final FileSystem fileSystem = FileSystem.get(new org.apache.hadoop.conf.Configuration()); - final Path hdfsFolder = new Path(fileSystem.getUri() + "/" + resourceLaunchProto.getIdentifier() + "/"); + final Path hdfsFolder = new Path(fileSystem.getUri() + "/" + resourceLaunchEvent.getIdentifier() + "/"); FileUtil.copy(localStagingFolder, fileSystem, hdfsFolder, false, new org.apache.hadoop.conf.Configuration()); // TODO: Replace REEFExecutor with a simple launch command (we only need to launch REEFExecutor) final LaunchCommandBuilder commandBuilder; - switch (resourceLaunchProto.getType()) { + switch (resourceLaunchEvent.getType()) { case JVM: commandBuilder = new JavaLaunchCommandBuilder().setClassPath(this.classpath.getEvaluatorClasspath()); break; @@ -115,13 +115,13 @@ final class MesosResourceLaunchHandler implements ResourceLaunchHandler { final List<String> command = commandBuilder .setErrorHandlerRID(this.remoteManager.getMyIdentifier()) - .setLaunchID(resourceLaunchProto.getIdentifier()) + .setLaunchID(resourceLaunchEvent.getIdentifier()) .setConfigurationFileName(this.fileNames.getEvaluatorConfigurationPath()) - .setMemory((int) (this.jvmHeapFactor * this.executors.getMemory(resourceLaunchProto.getIdentifier()))) + .setMemory((int) (this.jvmHeapFactor * this.executors.getMemory(resourceLaunchEvent.getIdentifier()))) .build(); this.executors.launchEvaluator( - new EvaluatorLaunch(resourceLaunchProto.getIdentifier(), StringUtils.join(command, ' '))); + new EvaluatorLaunch(resourceLaunchEvent.getIdentifier(), StringUtils.join(command, ' '))); } catch (IOException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java index 41c487e..4c34aef 100644 --- a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java +++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceReleaseHandler.java @@ -20,7 +20,7 @@ package org.apache.reef.runtime.mesos.driver; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.proto.DriverRuntimeProtocol; +import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent; import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler; import javax.inject.Inject; @@ -36,7 +36,7 @@ final class MesosResourceReleaseHandler implements ResourceReleaseHandler { } @Override - public void onNext(final DriverRuntimeProtocol.ResourceReleaseProto resourceReleaseProto) { - REEFScheduler.onResourceRelease(resourceReleaseProto); + public void onNext(final ResourceReleaseEvent resourceReleaseEvent) { + REEFScheduler.onResourceRelease(resourceReleaseEvent); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java index a9c1016..feca5b8 100644 --- a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java +++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/MesosResourceRequestHandler.java @@ -20,7 +20,7 @@ package org.apache.reef.runtime.mesos.driver; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.proto.DriverRuntimeProtocol; +import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler; import javax.inject.Inject; @@ -36,7 +36,7 @@ final class MesosResourceRequestHandler implements ResourceRequestHandler { } @Override - public void onNext(final DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) { - REEFScheduler.onResourceRequest(resourceRequestProto); + public void onNext(final ResourceRequestEvent resourceRequestEvent) { + REEFScheduler.onResourceRequest(resourceRequestEvent); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java index fd5cce2..c8a5fa6 100644 --- a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java +++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFEventHandlers.java @@ -19,11 +19,11 @@ package org.apache.reef.runtime.mesos.driver; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.proto.DriverRuntimeProtocol.NodeDescriptorProto; -import org.apache.reef.proto.DriverRuntimeProtocol.ResourceAllocationProto; -import org.apache.reef.proto.DriverRuntimeProtocol.ResourceStatusProto; -import org.apache.reef.proto.DriverRuntimeProtocol.RuntimeStatusProto; import org.apache.reef.runtime.common.driver.api.RuntimeParameters; +import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.wake.EventHandler; @@ -31,35 +31,35 @@ import javax.inject.Inject; @Private final class REEFEventHandlers { - private final EventHandler<ResourceAllocationProto> resourceAllocationEventHandler; - private final EventHandler<RuntimeStatusProto> runtimeStatusEventHandler; - private final EventHandler<NodeDescriptorProto> nodeDescriptorEventHandler; - private final EventHandler<ResourceStatusProto> resourceStatusHandlerEventHandler; + private final EventHandler<ResourceAllocationEvent> resourceAllocationEventHandler; + private final EventHandler<RuntimeStatusEvent> runtimeStatusEventHandler; + private final EventHandler<NodeDescriptorEvent> nodeDescriptorEventHandler; + private final EventHandler<ResourceStatusEvent> resourceStatusHandlerEventHandler; @Inject - REEFEventHandlers(final @Parameter(RuntimeParameters.ResourceAllocationHandler.class) EventHandler<ResourceAllocationProto> resourceAllocationEventHandler, - final @Parameter(RuntimeParameters.RuntimeStatusHandler.class) EventHandler<RuntimeStatusProto> runtimeStatusEventHandler, - final @Parameter(RuntimeParameters.NodeDescriptorHandler.class) EventHandler<NodeDescriptorProto> nodeDescriptorEventHandler, - final @Parameter(RuntimeParameters.ResourceStatusHandler.class) EventHandler<ResourceStatusProto> resourceStatusHandlerEventHandler) { + REEFEventHandlers(final @Parameter(RuntimeParameters.ResourceAllocationHandler.class) EventHandler<ResourceAllocationEvent> resourceAllocationEventHandler, + final @Parameter(RuntimeParameters.RuntimeStatusHandler.class) EventHandler<RuntimeStatusEvent> runtimeStatusEventHandler, + final @Parameter(RuntimeParameters.NodeDescriptorHandler.class) EventHandler<NodeDescriptorEvent> nodeDescriptorEventHandler, + final @Parameter(RuntimeParameters.ResourceStatusHandler.class) EventHandler<ResourceStatusEvent> resourceStatusHandlerEventHandler) { this.resourceAllocationEventHandler = resourceAllocationEventHandler; this.runtimeStatusEventHandler = runtimeStatusEventHandler; this.nodeDescriptorEventHandler = nodeDescriptorEventHandler; this.resourceStatusHandlerEventHandler = resourceStatusHandlerEventHandler; } - void onNodeDescriptor(final NodeDescriptorProto nodeDescriptorProto) { + void onNodeDescriptor(final NodeDescriptorEvent nodeDescriptorProto) { this.nodeDescriptorEventHandler.onNext(nodeDescriptorProto); } - void onRuntimeStatus(final RuntimeStatusProto runtimeStatusProto) { + void onRuntimeStatus(final RuntimeStatusEvent runtimeStatusProto) { this.runtimeStatusEventHandler.onNext(runtimeStatusProto); } - void onResourceAllocation(final ResourceAllocationProto resourceAllocationProto) { + void onResourceAllocation(final ResourceAllocationEvent resourceAllocationProto) { this.resourceAllocationEventHandler.onNext(resourceAllocationProto); } - void onResourceStatus(final ResourceStatusProto resourceStatusProto) { + void onResourceStatus(final ResourceStatusEvent resourceStatusProto) { this.resourceStatusHandlerEventHandler.onNext(resourceStatusProto); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java index 9c2c6d9..ba11aae 100644 --- a/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java +++ b/lang/java/reef-runtime-mesos/src/main/java/org/apache/reef/runtime/mesos/driver/REEFScheduler.java @@ -20,22 +20,23 @@ package org.apache.reef.runtime.mesos.driver; import com.google.protobuf.ByteString; import org.apache.mesos.MesosSchedulerDriver; -import org.apache.reef.proto.DriverRuntimeProtocol; -import org.apache.reef.proto.DriverRuntimeProtocol.NodeDescriptorProto; -import org.apache.reef.proto.DriverRuntimeProtocol.ResourceAllocationProto; -import org.apache.reef.proto.DriverRuntimeProtocol.ResourceReleaseProto; -import org.apache.reef.proto.DriverRuntimeProtocol.ResourceRequestProto; -import org.apache.reef.proto.DriverRuntimeProtocol.RuntimeStatusProto; -import org.apache.reef.proto.DriverRuntimeProtocol.RuntimeStatusProto.Builder; import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.proto.ReefServiceProtos.State; import org.apache.reef.runtime.common.driver.api.AbstractDriverRuntimeConfiguration; +import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent; +import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; +import org.apache.reef.runtime.common.driver.api.ResourceRequestEventImpl; +import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEventImpl; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl; +import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl; import org.apache.reef.runtime.common.files.ClasspathProvider; import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.runtime.mesos.driver.parameters.MesosMasterIp; import org.apache.reef.runtime.mesos.evaluator.REEFExecutor; import org.apache.reef.runtime.mesos.util.EvaluatorControl; -import org.apache.reef.runtime.mesos.util.EvaluatorLaunch; import org.apache.reef.runtime.mesos.util.EvaluatorRelease; import org.apache.reef.runtime.mesos.util.MesosRemoteManager; import org.apache.reef.tang.annotations.Parameter; @@ -106,8 +107,8 @@ final class REEFScheduler implements Scheduler { private final Map<String, Offer> offers = new ConcurrentHashMap<>(); private int outstandingRequestCounter = 0; - private final ConcurrentLinkedQueue<ResourceRequestProto> outstandingRequests = new ConcurrentLinkedQueue<>(); - private final Map<String, ResourceRequestProto> executorIdToLaunchedRequests = new ConcurrentHashMap<>(); + private final ConcurrentLinkedQueue<ResourceRequestEvent> outstandingRequests = new ConcurrentLinkedQueue<>(); + private final Map<String, ResourceRequestEvent> executorIdToLaunchedRequests = new ConcurrentHashMap<>(); private final REEFExecutors executors; @Inject @@ -151,24 +152,24 @@ final class REEFScheduler implements Scheduler { */ @Override public void resourceOffers(final SchedulerDriver driver, final List<Protos.Offer> offers) { - final Map<String, NodeDescriptorProto.Builder> nodeDescriptorProtos = new HashMap<>(); + final Map<String, NodeDescriptorEventImpl.Builder> nodeDescriptorEvents = new HashMap<>(); for (final Offer offer : offers) { - if (nodeDescriptorProtos.get(offer.getSlaveId().getValue()) == null) { - nodeDescriptorProtos.put(offer.getSlaveId().getValue(), NodeDescriptorProto.newBuilder() - .setIdentifier(offer.getSlaveId().getValue()) - .setHostName(offer.getHostname()) - .setPort(MESOS_SLAVE_PORT) - .setMemorySize(getMemory(offer))); + if (nodeDescriptorEvents.get(offer.getSlaveId().getValue()) == null) { + nodeDescriptorEvents.put(offer.getSlaveId().getValue(), NodeDescriptorEventImpl.newBuilder() + .setIdentifier(offer.getSlaveId().getValue()) + .setHostName(offer.getHostname()) + .setPort(MESOS_SLAVE_PORT) + .setMemorySize(getMemory(offer))); } else { - final NodeDescriptorProto.Builder builder = nodeDescriptorProtos.get(offer.getSlaveId().getValue()); - builder.setMemorySize(builder.getMemorySize() + getMemory(offer)); + final NodeDescriptorEventImpl.Builder builder = nodeDescriptorEvents.get(offer.getSlaveId().getValue()); + builder.setMemorySize(builder.build().getMemorySize() + getMemory(offer)); } this.offers.put(offer.getId().getValue(), offer); } - for (final NodeDescriptorProto.Builder ndpBuilder : nodeDescriptorProtos.values()) { + for (final NodeDescriptorEventImpl.Builder ndpBuilder : nodeDescriptorEvents.values()) { this.reefEventHandlers.onNodeDescriptor(ndpBuilder.build()); } @@ -190,8 +191,8 @@ final class REEFScheduler implements Scheduler { public void statusUpdate(final SchedulerDriver driver, final Protos.TaskStatus taskStatus) { LOG.log(Level.SEVERE, "Task Status Update:", taskStatus.toString()); - final DriverRuntimeProtocol.ResourceStatusProto.Builder resourceStatus = - DriverRuntimeProtocol.ResourceStatusProto.newBuilder().setIdentifier(taskStatus.getTaskId().getValue()); + final ResourceStatusEventImpl.Builder resourceStatus = + ResourceStatusEventImpl.newBuilder().setIdentifier(taskStatus.getTaskId().getValue()); switch(taskStatus.getState()) { case TASK_STARTING: @@ -251,8 +252,8 @@ final class REEFScheduler implements Scheduler { final Protos.SlaveID slaveId, final int status) { final String diagnostics = "Executor Lost. executorid: "+executorId.getValue()+" slaveid: "+slaveId.getValue(); - final DriverRuntimeProtocol.ResourceStatusProto resourceStatus = - DriverRuntimeProtocol.ResourceStatusProto.newBuilder() + final ResourceStatusEvent resourceStatus = + ResourceStatusEventImpl.newBuilder() .setIdentifier(executorId.getValue()) .setState(State.FAILED) .setExitCode(status) @@ -283,39 +284,39 @@ final class REEFScheduler implements Scheduler { } } - public void onResourceRequest(final ResourceRequestProto resourceRequestProto) { - this.outstandingRequestCounter += resourceRequestProto.getResourceCount(); + public void onResourceRequest(final ResourceRequestEvent resourceRequestEvent) { + this.outstandingRequestCounter += resourceRequestEvent.getResourceCount(); updateRuntimeStatus(); - doResourceRequest(resourceRequestProto); + doResourceRequest(resourceRequestEvent); } - public void onResourceRelease(final ResourceReleaseProto resourceReleaseProto) { - this.executors.releaseEvaluator(new EvaluatorRelease(resourceReleaseProto.getIdentifier())); - this.executors.remove(resourceReleaseProto.getIdentifier()); + public void onResourceRelease(final ResourceReleaseEvent resourceReleaseEvent) { + this.executors.releaseEvaluator(new EvaluatorRelease(resourceReleaseEvent.getIdentifier())); + this.executors.remove(resourceReleaseEvent.getIdentifier()); updateRuntimeStatus(); } /** * Greedily acquire resources by launching a Mesos Task(w/ our custom MesosExecutor) on REEF Evaluator request. * Either called from onResourceRequest(for a new request) or resourceOffers(for an outstanding request). - * TODO: reflect priority and rack/node locality specified in resourceRequestProto. + * TODO: reflect priority and rack/node locality specified in resourceRequestEvent. */ - private synchronized void doResourceRequest(final ResourceRequestProto resourceRequestProto) { - int tasksToLaunchCounter = resourceRequestProto.getResourceCount(); + private synchronized void doResourceRequest(final ResourceRequestEvent resourceRequestEvent) { + int tasksToLaunchCounter = resourceRequestEvent.getResourceCount(); for (final Offer offer : this.offers.values()) { - final int cpuSlots = getCpu(offer) / resourceRequestProto.getVirtualCores(); - final int memSlots = getMemory(offer) / resourceRequestProto.getMemorySize(); + final int cpuSlots = getCpu(offer) / resourceRequestEvent.getVirtualCores().get(); + final int memSlots = getMemory(offer) / resourceRequestEvent.getMemorySize().get(); final int taskNum = Math.min(Math.min(cpuSlots, memSlots), tasksToLaunchCounter); - if (taskNum > 0 && satisfySlaveConstraint(resourceRequestProto, offer)) { + if (taskNum > 0 && satisfySlaveConstraint(resourceRequestEvent, offer)) { final List<TaskInfo> tasksToLaunch = new ArrayList<>(); tasksToLaunchCounter -= taskNum; // Launch as many MesosTasks on the same node(offer) as possible to exploit locality. for (int j = 0; j < taskNum; j++) { final String id = offer.getId().getValue() + "-" + String.valueOf(j); - final String executorLaunchCommand = getExecutorLaunchCommand(id, resourceRequestProto.getMemorySize()); + final String executorLaunchCommand = getExecutorLaunchCommand(id, resourceRequestEvent.getMemorySize().get()); final ExecutorInfo executorInfo = ExecutorInfo.newBuilder() .setExecutorId(ExecutorID.newBuilder() @@ -334,24 +335,24 @@ final class REEFScheduler implements Scheduler { .setName(id) .setSlaveId(offer.getSlaveId()) .addResources(Resource.newBuilder() - .setName("mem") - .setType(Type.SCALAR) - .setScalar(Value.Scalar.newBuilder() - .setValue(resourceRequestProto.getMemorySize()) + .setName("mem") + .setType(Type.SCALAR) + .setScalar(Value.Scalar.newBuilder() + .setValue(resourceRequestEvent.getMemorySize().get()) + .build()) .build()) - .build()) .addResources(Resource.newBuilder() - .setName("cpus") - .setType(Type.SCALAR) - .setScalar(Value.Scalar.newBuilder() - .setValue(resourceRequestProto.getVirtualCores()) + .setName("cpus") + .setType(Type.SCALAR) + .setScalar(Value.Scalar.newBuilder() + .setValue(resourceRequestEvent.getVirtualCores().get()) + .build()) .build()) - .build()) .setExecutor(executorInfo) .build(); tasksToLaunch.add(taskInfo); - this.executorIdToLaunchedRequests.put(id, resourceRequestProto); + this.executorIdToLaunchedRequests.put(id, resourceRequestEvent); } final Filters filters = Filters.newBuilder().setRefuseSeconds(0).build(); @@ -365,24 +366,24 @@ final class REEFScheduler implements Scheduler { this.offers.clear(); // Save leftovers that couldn't be launched - outstandingRequests.add(ResourceRequestProto.newBuilder() - .mergeFrom(resourceRequestProto) + outstandingRequests.add(ResourceRequestEventImpl.newBuilder() + .mergeFrom(resourceRequestEvent) .setResourceCount(tasksToLaunchCounter) .build()); } private void handleNewExecutor(final Protos.TaskStatus taskStatus) { - final ResourceRequestProto resourceRequestProto = + final ResourceRequestEvent resourceRequestProto = this.executorIdToLaunchedRequests.remove(taskStatus.getTaskId().getValue()); final EventHandler<EvaluatorControl> evaluatorControlHandler = this.mesosRemoteManager.getHandler(taskStatus.getMessage(), EvaluatorControl.class); - this.executors.add(taskStatus.getTaskId().getValue(), resourceRequestProto.getMemorySize(), evaluatorControlHandler); + this.executors.add(taskStatus.getTaskId().getValue(), resourceRequestProto.getMemorySize().get(), evaluatorControlHandler); - final ResourceAllocationProto alloc = DriverRuntimeProtocol.ResourceAllocationProto.newBuilder() + final ResourceAllocationEvent alloc = ResourceAllocationEventImpl.newBuilder() .setIdentifier(taskStatus.getTaskId().getValue()) .setNodeId(taskStatus.getSlaveId().getValue()) - .setResourceMemory(resourceRequestProto.getMemorySize()) + .setResourceMemory(resourceRequestProto.getMemorySize().get()) .build(); reefEventHandlers.onResourceAllocation(alloc); @@ -391,7 +392,7 @@ final class REEFScheduler implements Scheduler { } private synchronized void updateRuntimeStatus() { - final Builder builder = DriverRuntimeProtocol.RuntimeStatusProto.newBuilder() + final RuntimeStatusEventImpl.Builder builder = RuntimeStatusEventImpl.newBuilder() .setName(RUNTIME_NAME) .setState(State.RUNNING) .setOutstandingContainerRequests(this.outstandingRequestCounter); @@ -411,7 +412,7 @@ final class REEFScheduler implements Scheduler { throw new RuntimeException(e); } - final Builder runtimeStatusBuilder = RuntimeStatusProto.newBuilder() + final RuntimeStatusEventImpl.Builder runtimeStatusBuilder = RuntimeStatusEventImpl.newBuilder() .setState(State.FAILED) .setName(RUNTIME_NAME); @@ -425,9 +426,9 @@ final class REEFScheduler implements Scheduler { this.reefEventHandlers.onRuntimeStatus(runtimeStatusBuilder.build()); } - private boolean satisfySlaveConstraint(final ResourceRequestProto resourceRequestProto, final Offer offer) { - return resourceRequestProto.getNodeNameCount() == 0 || - resourceRequestProto.getNodeNameList().contains(offer.getSlaveId().getValue()); + private boolean satisfySlaveConstraint(final ResourceRequestEvent resourceRequestEvent, final Offer offer) { + return resourceRequestEvent.getNodeNameList().size() == 0 || + resourceRequestEvent.getNodeNameList().contains(offer.getSlaveId().getValue()); } private int getMemory(final Offer offer) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java index eb6b802..62800e7 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/client/YarnJobSubmissionHandler.java @@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.reef.annotations.audience.ClientSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.parameters.DriverJobSubmissionDirectory; -import org.apache.reef.proto.ClientRuntimeProtocol; +import org.apache.reef.runtime.common.client.api.JobSubmissionEvent; import org.apache.reef.runtime.common.client.api.JobSubmissionHandler; import org.apache.reef.runtime.common.files.ClasspathProvider; import org.apache.reef.runtime.common.files.JobJarMaker; @@ -85,28 +85,28 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { } @Override - public void onNext(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) { + public void onNext(final JobSubmissionEvent jobSubmissionEvent) { - LOG.log(Level.FINEST, "Submitting job with ID [{0}]", jobSubmissionProto.getIdentifier()); + LOG.log(Level.FINEST, "Submitting job with ID [{0}]", jobSubmissionEvent.getIdentifier()); try (final YarnSubmissionHelper submissionHelper = new YarnSubmissionHelper(this.yarnConfiguration, this.fileNames, this.classpath)) { LOG.log(Level.FINE, "Assembling submission JAR for the Driver."); final JobFolder jobFolderOnDfs = this.uploader.createJobFolder(submissionHelper.getApplicationId()); - final Configuration driverConfiguration = makeDriverConfiguration(jobSubmissionProto, jobFolderOnDfs.getPath()); - final File jobSubmissionFile = this.jobJarMaker.createJobSubmissionJAR(jobSubmissionProto, driverConfiguration); + final Configuration driverConfiguration = makeDriverConfiguration(jobSubmissionEvent, jobFolderOnDfs.getPath()); + final File jobSubmissionFile = this.jobJarMaker.createJobSubmissionJAR(jobSubmissionEvent, driverConfiguration); final LocalResource driverJarOnDfs = jobFolderOnDfs.uploadAsLocalResource(jobSubmissionFile); submissionHelper .addLocalResource(this.fileNames.getREEFFolderName(), driverJarOnDfs) - .setApplicationName(jobSubmissionProto.getIdentifier()) - .setDriverMemory(jobSubmissionProto.getDriverMemory()) - .setPriority(getPriority(jobSubmissionProto)) - .setQueue(getQueue(jobSubmissionProto, "default")) - .submit(jobSubmissionProto.getRemoteId()); + .setApplicationName(jobSubmissionEvent.getIdentifier()) + .setDriverMemory(jobSubmissionEvent.getDriverMemory().get()) + .setPriority(getPriority(jobSubmissionEvent)) + .setQueue(getQueue(jobSubmissionEvent, "default")) + .submit(jobSubmissionEvent.getRemoteId()); - LOG.log(Level.FINEST, "Submitted job with ID [{0}]", jobSubmissionProto.getIdentifier()); + LOG.log(Level.FINEST, "Submitted job with ID [{0}]", jobSubmissionEvent.getIdentifier()); } catch (final YarnException | IOException e) { throw new RuntimeException("Unable to submit Driver to YARN.", e); } @@ -116,9 +116,9 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { * Assembles the Driver configuration. */ private Configuration makeDriverConfiguration( - final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto, + final JobSubmissionEvent jobSubmissionEvent, final Path jobFolderPath) throws IOException { - final Configuration config = this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration()); + final Configuration config = jobSubmissionEvent.getConfiguration(); final String userBoundJobSubmissionDirectory = config.getNamedParameter((NamedParameterNode<?>) config.getClassHierarchy().getNode(ReflectionUtilities.getFullName(DriverJobSubmissionDirectory.class))); LOG.log(Level.FINE, "user bound job submission Directory: " + userBoundJobSubmissionDirectory); final String finalJobFolderPath = @@ -127,27 +127,24 @@ final class YarnJobSubmissionHandler implements JobSubmissionHandler { return Configurations.merge( YarnDriverConfiguration.CONF .set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, finalJobFolderPath) - .set(YarnDriverConfiguration.JOB_IDENTIFIER, jobSubmissionProto.getIdentifier()) - .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, jobSubmissionProto.getRemoteId()) + .set(YarnDriverConfiguration.JOB_IDENTIFIER, jobSubmissionEvent.getIdentifier()) + .set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, jobSubmissionEvent.getRemoteId()) .set(YarnDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack) .build(), - this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration())); + config); } - private static int getPriority(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) { - return jobSubmissionProto.hasPriority() ? jobSubmissionProto.getPriority() : 0; + private static int getPriority(final JobSubmissionEvent jobSubmissionEvent) { + return jobSubmissionEvent.getPriority().orElse(0); } /** - * Extract the queue name from the jobSubmissionProto or return default if none is set. + * Extract the queue name from the jobSubmissionEvent or return default if none is set. * <p/> * TODO: Revisit this. We also have a named parameter for the queue in YarnClientConfiguration. */ - private final String getQueue(final ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto, + private final String getQueue(final JobSubmissionEvent jobSubmissionEvent, final String defaultQueue) { - return jobSubmissionProto.hasQueue() && !jobSubmissionProto.getQueue().isEmpty() ? - jobSubmissionProto.getQueue() : defaultQueue; + return jobSubmissionEvent.getQueue().orElse(defaultQueue); } - - } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java index 3a2ff5c..a264b66 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/EvaluatorSetupHelper.java @@ -23,7 +23,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.io.TempFileCreator; import org.apache.reef.io.WorkingDirectoryTempFileCreator; -import org.apache.reef.proto.DriverRuntimeProtocol; +import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent; import org.apache.reef.runtime.common.files.JobJarMaker; import org.apache.reef.runtime.common.files.REEFFileNames; import org.apache.reef.runtime.common.parameters.DeleteTempFiles; @@ -88,12 +88,12 @@ final class EvaluatorSetupHelper { /** * Sets up the LocalResources for a new Evaluator. * - * @param resourceLaunchProto + * @param resourceLaunchEvent * @return * @throws IOException */ Map<String, LocalResource> getResources( - final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto) + final ResourceLaunchEvent resourceLaunchEvent) throws IOException { final Map<String, LocalResource> result = new HashMap<>(); @@ -103,10 +103,10 @@ final class EvaluatorSetupHelper { // Write the configuration final File configurationFile = new File(localStagingFolder, this.fileNames.getEvaluatorConfigurationName()); - this.configurationSerializer.toFile(makeEvaluatorConfiguration(resourceLaunchProto), configurationFile); + this.configurationSerializer.toFile(makeEvaluatorConfiguration(resourceLaunchEvent), configurationFile); // Copy files to the staging folder - JobJarMaker.copy(resourceLaunchProto.getFileList(), localStagingFolder); + JobJarMaker.copy(resourceLaunchEvent.getFileSet(), localStagingFolder); // Make a JAR file out of it final File localFile = tempFileCreator.createTempFile( @@ -132,15 +132,15 @@ final class EvaluatorSetupHelper { /** * Assembles the configuration for an Evaluator. * - * @param resourceLaunchProto + * @param resourceLaunchEvent * @return * @throws IOException */ - private Configuration makeEvaluatorConfiguration(final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto) + private Configuration makeEvaluatorConfiguration(final ResourceLaunchEvent resourceLaunchEvent) throws IOException { return Tang.Factory.getTang() - .newConfigurationBuilder(this.configurationSerializer.fromString(resourceLaunchProto.getEvaluatorConf())) + .newConfigurationBuilder(resourceLaunchEvent.getEvaluatorConf()) .bindImplementation(TempFileCreator.class, WorkingDirectoryTempFileCreator.class) .build(); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java index ec43666..a204add 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/REEFEventHandlers.java @@ -19,8 +19,11 @@ package org.apache.reef.runtime.yarn.driver; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.proto.DriverRuntimeProtocol; import org.apache.reef.runtime.common.driver.api.RuntimeParameters; +import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent; import org.apache.reef.tang.annotations.Parameter; import org.apache.reef.wake.EventHandler; @@ -32,20 +35,20 @@ import javax.inject.Inject; // This is a great place to add a thread boundary, should that need arise. @Private final class REEFEventHandlers implements AutoCloseable { - private final EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> resourceAllocationHandler; - private final EventHandler<DriverRuntimeProtocol.ResourceStatusProto> resourceStatusHandler; - private final EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> runtimeStatusHandler; - private final EventHandler<DriverRuntimeProtocol.NodeDescriptorProto> nodeDescriptorProtoEventHandler; + private final EventHandler<ResourceAllocationEvent> resourceAllocationHandler; + private final EventHandler<ResourceStatusEvent> resourceStatusHandler; + private final EventHandler<RuntimeStatusEvent> runtimeStatusHandler; + private final EventHandler<NodeDescriptorEvent> nodeDescriptorEventHandler; @Inject - REEFEventHandlers(final @Parameter(RuntimeParameters.NodeDescriptorHandler.class) EventHandler<DriverRuntimeProtocol.NodeDescriptorProto> nodeDescriptorProtoEventHandler, - final @Parameter(RuntimeParameters.RuntimeStatusHandler.class) EventHandler<DriverRuntimeProtocol.RuntimeStatusProto> runtimeStatusProtoEventHandler, - final @Parameter(RuntimeParameters.ResourceAllocationHandler.class) EventHandler<DriverRuntimeProtocol.ResourceAllocationProto> resourceAllocationHandler, - final @Parameter(RuntimeParameters.ResourceStatusHandler.class) EventHandler<DriverRuntimeProtocol.ResourceStatusProto> resourceStatusHandler) { + REEFEventHandlers(final @Parameter(RuntimeParameters.NodeDescriptorHandler.class) EventHandler<NodeDescriptorEvent> nodeDescriptorEventHandler, + final @Parameter(RuntimeParameters.RuntimeStatusHandler.class) EventHandler<RuntimeStatusEvent> runtimeStatusProtoEventHandler, + final @Parameter(RuntimeParameters.ResourceAllocationHandler.class) EventHandler<ResourceAllocationEvent> resourceAllocationHandler, + final @Parameter(RuntimeParameters.ResourceStatusHandler.class) EventHandler<ResourceStatusEvent> resourceStatusHandler) { this.resourceAllocationHandler = resourceAllocationHandler; this.resourceStatusHandler = resourceStatusHandler; this.runtimeStatusHandler = runtimeStatusProtoEventHandler; - this.nodeDescriptorProtoEventHandler = nodeDescriptorProtoEventHandler; + this.nodeDescriptorEventHandler = nodeDescriptorEventHandler; } /** @@ -53,35 +56,35 @@ final class REEFEventHandlers implements AutoCloseable { * * @param nodeDescriptorProto */ - void onNodeDescriptor(final DriverRuntimeProtocol.NodeDescriptorProto nodeDescriptorProto) { - this.nodeDescriptorProtoEventHandler.onNext(nodeDescriptorProto); + void onNodeDescriptor(final NodeDescriptorEvent nodeDescriptorProto) { + this.nodeDescriptorEventHandler.onNext(nodeDescriptorProto); } /** * Update REEF's view on the runtime status. * - * @param runtimeStatusProto + * @param runtimeStatusEvent */ - void onRuntimeStatus(final DriverRuntimeProtocol.RuntimeStatusProto runtimeStatusProto) { - this.runtimeStatusHandler.onNext(runtimeStatusProto); + void onRuntimeStatus(final RuntimeStatusEvent runtimeStatusEvent) { + this.runtimeStatusHandler.onNext(runtimeStatusEvent); } /** * Inform REEF of a fresh resource allocation. * - * @param resourceAllocationProto + * @param resourceAllocationEvent */ - void onResourceAllocation(final DriverRuntimeProtocol.ResourceAllocationProto resourceAllocationProto) { - this.resourceAllocationHandler.onNext(resourceAllocationProto); + void onResourceAllocation(final ResourceAllocationEvent resourceAllocationEvent) { + this.resourceAllocationHandler.onNext(resourceAllocationEvent); } /** * Update REEF on a change to the status of a resource. * - * @param resourceStatusProto + * @param resourceStatusEvent */ - void onResourceStatus(final DriverRuntimeProtocol.ResourceStatusProto resourceStatusProto) { - this.resourceStatusHandler.onNext(resourceStatusProto); + void onResourceStatus(final ResourceStatusEvent resourceStatusEvent) { + this.resourceStatusHandler.onNext(resourceStatusEvent); } @Override http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java index 073cf8c..8401e67 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceLaunchHandler.java @@ -23,7 +23,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.reef.proto.DriverRuntimeProtocol; +import org.apache.reef.runtime.common.driver.api.ResourceLaunchEvent; import org.apache.reef.runtime.common.driver.api.ResourceLaunchHandler; import org.apache.reef.runtime.common.files.ClasspathProvider; import org.apache.reef.runtime.common.files.REEFFileNames; @@ -73,18 +73,18 @@ public final class YARNResourceLaunchHandler implements ResourceLaunchHandler { } @Override - public void onNext(final DriverRuntimeProtocol.ResourceLaunchProto resourceLaunchProto) { + public void onNext(final ResourceLaunchEvent resourceLaunchEvent) { try { - final String containerId = resourceLaunchProto.getIdentifier(); - LOG.log(Level.FINEST, "TIME: Start ResourceLaunchProto {0}", containerId); + final String containerId = resourceLaunchEvent.getIdentifier(); + LOG.log(Level.FINEST, "TIME: Start ResourceLaunch {0}", containerId); final Container container = this.containers.get(containerId); LOG.log(Level.FINEST, "Setting up container launch container for id={0}", container.getId()); final Map<String, LocalResource> localResources = - this.evaluatorSetupHelper.getResources(resourceLaunchProto); + this.evaluatorSetupHelper.getResources(resourceLaunchEvent); final LaunchCommandBuilder commandBuilder; - switch (resourceLaunchProto.getType()) { + switch (resourceLaunchEvent.getType()) { case JVM: commandBuilder = new JavaLaunchCommandBuilder() .setClassPath(this.classpath.getEvaluatorClasspath()); @@ -94,12 +94,12 @@ public final class YARNResourceLaunchHandler implements ResourceLaunchHandler { break; default: throw new IllegalArgumentException( - "Unsupported container type: " + resourceLaunchProto.getType()); + "Unsupported container type: " + resourceLaunchEvent.getType()); } final List<String> command = commandBuilder - .setErrorHandlerRID(resourceLaunchProto.getRemoteId()) - .setLaunchID(resourceLaunchProto.getIdentifier()) + .setErrorHandlerRID(resourceLaunchEvent.getRemoteId()) + .setLaunchID(resourceLaunchEvent.getIdentifier()) .setConfigurationFileName(this.filenames.getEvaluatorConfigurationPath()) .setMemory((int) (this.jvmHeapFactor * container.getResource().getMemory())) .setStandardErr(ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/" + this.filenames.getEvaluatorStderrFileName()) @@ -115,10 +115,10 @@ public final class YARNResourceLaunchHandler implements ResourceLaunchHandler { final ContainerLaunchContext ctx = YarnTypes.getContainerLaunchContext(command, localResources); this.yarnContainerManager.get().submit(container, ctx); - LOG.log(Level.FINEST, "TIME: End ResourceLaunchProto {0}", containerId); + LOG.log(Level.FINEST, "TIME: End ResourceLaunch {0}", containerId); } catch (final Throwable e) { - LOG.log(Level.WARNING, "Error handling resource launch message: " + resourceLaunchProto, e); + LOG.log(Level.WARNING, "Error handling resource launch message: " + resourceLaunchEvent, e); throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceReleaseHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceReleaseHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceReleaseHandler.java index dda9fb3..9ce3c69 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceReleaseHandler.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YARNResourceReleaseHandler.java @@ -18,7 +18,7 @@ */ package org.apache.reef.runtime.yarn.driver; -import org.apache.reef.proto.DriverRuntimeProtocol; +import org.apache.reef.runtime.common.driver.api.ResourceReleaseEvent; import org.apache.reef.runtime.common.driver.api.ResourceReleaseHandler; import org.apache.reef.tang.InjectionFuture; @@ -42,8 +42,8 @@ public final class YARNResourceReleaseHandler implements ResourceReleaseHandler } @Override - public void onNext(final DriverRuntimeProtocol.ResourceReleaseProto resourceReleaseProto) { - final String containerId = resourceReleaseProto.getIdentifier(); + public void onNext(final ResourceReleaseEvent resourceReleaseEvent) { + final String containerId = resourceReleaseEvent.getIdentifier(); LOG.log(Level.FINEST, "Releasing container {0}", containerId); this.yarnContainerManager.get().release(containerId); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java index bd8128b..37fc784 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java @@ -33,14 +33,13 @@ import org.apache.hadoop.yarn.client.api.async.NMClientAsync; import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.reef.proto.DriverRuntimeProtocol; -import org.apache.reef.proto.DriverRuntimeProtocol.NodeDescriptorProto; -import org.apache.reef.proto.DriverRuntimeProtocol.ResourceAllocationProto; -import org.apache.reef.proto.DriverRuntimeProtocol.ResourceStatusProto; -import org.apache.reef.proto.DriverRuntimeProtocol.RuntimeStatusProto; import org.apache.reef.proto.ReefServiceProtos; import org.apache.reef.runtime.common.driver.DriverStatusManager; import org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager; +import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEventImpl; +import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEventImpl; +import org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl; import org.apache.reef.runtime.yarn.driver.parameters.YarnHeartbeatPeriod; import org.apache.reef.runtime.yarn.util.YarnTypes; import org.apache.reef.tang.annotations.Parameter; @@ -140,7 +139,7 @@ final class YarnContainerManager @Override public void onShutdownRequest() { - this.reefEventHandlers.onRuntimeStatus(RuntimeStatusProto.newBuilder() + this.reefEventHandlers.onRuntimeStatus(RuntimeStatusEventImpl.newBuilder() .setName(RUNTIME_NAME).setState(ReefServiceProtos.State.DONE).build()); this.driverStatusManager.onError(new Exception("Shutdown requested by YARN.")); } @@ -182,8 +181,8 @@ final class YarnContainerManager public final void onContainerStopped(final ContainerId containerId) { final boolean hasContainer = this.containers.hasContainer(containerId.toString()); if (hasContainer) { - final ResourceStatusProto.Builder resourceStatusBuilder = - ResourceStatusProto.newBuilder().setIdentifier(containerId.toString()); + final ResourceStatusEventImpl.Builder resourceStatusBuilder = + ResourceStatusEventImpl.newBuilder().setIdentifier(containerId.toString()); resourceStatusBuilder.setState(ReefServiceProtos.State.DONE); this.reefEventHandlers.onResourceStatus(resourceStatusBuilder.build()); } @@ -292,7 +291,7 @@ final class YarnContainerManager private void onNodeReport(final NodeReport nodeReport) { LOG.log(Level.FINE, "Send node descriptor: {0}", nodeReport); - this.reefEventHandlers.onNodeDescriptor(NodeDescriptorProto.newBuilder() + this.reefEventHandlers.onNodeDescriptor(NodeDescriptorEventImpl.newBuilder() .setIdentifier(nodeReport.getNodeId().toString()) .setHostName(nodeReport.getNodeId().getHost()) .setPort(nodeReport.getNodeId().getPort()) @@ -303,8 +302,8 @@ final class YarnContainerManager private void handleContainerError(final ContainerId containerId, final Throwable throwable) { - final ResourceStatusProto.Builder resourceStatusBuilder = - ResourceStatusProto.newBuilder().setIdentifier(containerId.toString()); + final ResourceStatusEventImpl.Builder resourceStatusBuilder = + ResourceStatusEventImpl.newBuilder().setIdentifier(containerId.toString()); resourceStatusBuilder.setState(ReefServiceProtos.State.FAILED); resourceStatusBuilder.setExitCode(1); @@ -364,8 +363,8 @@ final class YarnContainerManager if (hasContainer) { LOG.log(Level.FINE, "Received container status: {0}", containerId); - final ResourceStatusProto.Builder status = - ResourceStatusProto.newBuilder().setIdentifier(containerId); + final ResourceStatusEventImpl.Builder status = + ResourceStatusEventImpl.newBuilder().setIdentifier(containerId); switch (value.getState()) { case COMPLETE: @@ -445,7 +444,7 @@ final class YarnContainerManager doHomogeneousRequests(); LOG.log(Level.FINEST, "Allocated Container: memory = {0}, core number = {1}", new Object[]{container.getResource().getMemory(), container.getResource().getVirtualCores()}); - this.reefEventHandlers.onResourceAllocation(ResourceAllocationProto.newBuilder() + this.reefEventHandlers.onResourceAllocation(ResourceAllocationEventImpl.newBuilder() .setIdentifier(container.getId().toString()) .setNodeId(container.getNodeId().toString()) .setResourceMemory(container.getResource().getMemory()) @@ -507,8 +506,8 @@ final class YarnContainerManager */ private void updateRuntimeStatus() { - final DriverRuntimeProtocol.RuntimeStatusProto.Builder builder = - DriverRuntimeProtocol.RuntimeStatusProto.newBuilder() + final RuntimeStatusEventImpl.Builder builder = + RuntimeStatusEventImpl.newBuilder() .setName(RUNTIME_NAME) .setState(ReefServiceProtos.State.RUNNING) .setOutstandingContainerRequests(this.containerRequestCounter.get()); @@ -533,7 +532,7 @@ final class YarnContainerManager this.resourceManager.stop(); } - final RuntimeStatusProto.Builder runtimeStatusBuilder = RuntimeStatusProto.newBuilder() + final RuntimeStatusEventImpl.Builder runtimeStatusBuilder = RuntimeStatusEventImpl.newBuilder() .setState(ReefServiceProtos.State.FAILED) .setName(RUNTIME_NAME); @@ -590,7 +589,7 @@ final class YarnContainerManager LOG.log(Level.WARNING, "Container [" + containerId + "] has failed during driver restart process, FailedEvaluaorHandler will be triggered, but no additional evaluator can be requested due to YARN-2433."); // trigger a failed evaluator event - this.reefEventHandlers.onResourceStatus(ResourceStatusProto.newBuilder() + this.reefEventHandlers.onResourceStatus(ResourceStatusEventImpl.newBuilder() .setIdentifier(containerId) .setState(ReefServiceProtos.State.FAILED) .setExitCode(1) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/089be44d/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java index 6762362..9b8fcc0 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java @@ -24,7 +24,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.util.Records; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.proto.DriverRuntimeProtocol; +import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler; import javax.inject.Inject; @@ -51,41 +51,41 @@ public final class YarnResourceRequestHandler implements ResourceRequestHandler } @Override - public synchronized void onNext(final DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) { - LOG.log(Level.FINEST, "Got ResourceRequestProto in YarnResourceRequestHandler: memory = {0}, cores = {1}.", new Object[]{resourceRequestProto.getMemorySize(), resourceRequestProto.getVirtualCores()}); + public synchronized void onNext(final ResourceRequestEvent resourceRequestEvent) { + LOG.log(Level.FINEST, "Got ResourceRequestEvent in YarnResourceRequestHandler: memory = {0}, cores = {1}.", new Object[]{resourceRequestEvent.getMemorySize(), resourceRequestEvent.getVirtualCores()}); - final String[] nodes = resourceRequestProto.getNodeNameCount() == 0 ? null : - resourceRequestProto.getNodeNameList().toArray(new String[resourceRequestProto.getNodeNameCount()]); - final String[] racks = resourceRequestProto.getRackNameCount() == 0 ? null : - resourceRequestProto.getRackNameList().toArray(new String[resourceRequestProto.getRackNameCount()]); + final String[] nodes = resourceRequestEvent.getNodeNameList().size() == 0 ? null : + resourceRequestEvent.getNodeNameList().toArray(new String[resourceRequestEvent.getNodeNameList().size()]); + final String[] racks = resourceRequestEvent.getRackNameList().size() == 0 ? null : + resourceRequestEvent.getRackNameList().toArray(new String[resourceRequestEvent.getRackNameList().size()]); // set the priority for the request - final Priority pri = getPriority(resourceRequestProto); - final Resource resource = getResource(resourceRequestProto); - final boolean relax_locality = !resourceRequestProto.hasRelaxLocality() || resourceRequestProto.getRelaxLocality(); + final Priority pri = getPriority(resourceRequestEvent); + final Resource resource = getResource(resourceRequestEvent); + final boolean relax_locality = resourceRequestEvent.getRelaxLocality().orElse(true); final AMRMClient.ContainerRequest[] containerRequests = - new AMRMClient.ContainerRequest[resourceRequestProto.getResourceCount()]; + new AMRMClient.ContainerRequest[resourceRequestEvent.getResourceCount()]; - for (int i = 0; i < resourceRequestProto.getResourceCount(); i++) { + for (int i = 0; i < resourceRequestEvent.getResourceCount(); i++) { containerRequests[i] = new AMRMClient.ContainerRequest(resource, nodes, racks, pri, relax_locality); } this.yarnContainerRequestHandler.onContainerRequest(containerRequests); } - private synchronized Resource getResource(final DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) { + private synchronized Resource getResource(final ResourceRequestEvent resourceRequestEvent) { final Resource result = Records.newRecord(Resource.class); - final int memory = getMemory(resourceRequestProto.getMemorySize()); - final int core = resourceRequestProto.getVirtualCores(); + final int memory = getMemory(resourceRequestEvent.getMemorySize().get()); + final int core = resourceRequestEvent.getVirtualCores().get(); LOG.log(Level.FINEST, "Resource requested: memory = {0}, virtual core count = {1}.", new Object[]{memory, core}); result.setMemory(memory); result.setVirtualCores(core); return result; } - private synchronized Priority getPriority(final DriverRuntimeProtocol.ResourceRequestProto resourceRequestProto) { + private synchronized Priority getPriority(final ResourceRequestEvent resourceRequestEvent) { final Priority pri = Records.newRecord(Priority.class); - pri.setPriority(resourceRequestProto.hasPriority() ? resourceRequestProto.getPriority() : 1); + pri.setPriority(resourceRequestEvent.getPriority().orElse(1)); return pri; }
