Repository: incubator-reef Updated Branches: refs/heads/master a89fb6859 -> 92806d0df
[REEF-574] Eliminate Task Configuration (de-)serialization for .NET Currently Task configuration is passed over the bridge as a string. At Java side, it is deserialized into a Configuration object with C# class hierarchy. Before it is passed over to Evaluator, it is serialized again into a string. This adds overloading methods to allow the task configuration passing around as a string without deserialization. And then sent it directly to the Evaluator. Those methods are added to the impl classes but not interface to avaoid being exposed as public interface. JIRA: [REEF-574](https://issues.apache.org/jira/browse/REEF-574) Pull Request: This closes #355 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/92806d0d Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/92806d0d Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/92806d0d Branch: refs/heads/master Commit: 92806d0df809c8a05d165422420e716bddc29d9e Parents: a89fb68 Author: Julia Wang <[email protected]> Authored: Thu Aug 6 20:34:28 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Sat Aug 15 13:10:48 2015 -0700 ---------------------------------------------------------------------- .../reef/javabridge/ActiveContextBridge.java | 14 ++-- .../javabridge/AllocatedEvaluatorBridge.java | 16 +++-- .../common/driver/context/EvaluatorContext.java | 5 +- .../evaluator/AllocatedEvaluatorImpl.java | 68 +++++++++++++------- 4 files changed, 66 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/92806d0d/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java index 47df761..46f44fd 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java @@ -20,8 +20,8 @@ package org.apache.reef.javabridge; import org.apache.reef.driver.context.ActiveContext; import org.apache.reef.io.naming.Identifiable; +import org.apache.reef.runtime.common.driver.context.EvaluatorContext; import org.apache.reef.tang.ClassHierarchy; -import org.apache.reef.tang.Configuration; import org.apache.reef.tang.formats.AvroConfigurationSerializer; import java.util.logging.Level; @@ -51,15 +51,9 @@ public final class ActiveContextBridge extends NativeBridge implements Identifia throw new RuntimeException("empty taskConfigurationString provided."); } - final Configuration taskConfiguration; - try { - taskConfiguration = serializer.fromString(taskConfigurationString, clrClassHierarchy); - } catch (final Exception e) { - final String message = "Unable to de-serialize CLR task configurations using class hierarchy."; - LOG.log(Level.SEVERE, message, e); - throw new RuntimeException(message, e); - } - jactiveContext.submitTask(taskConfiguration); + //when submit over the bridge, we would keep the task configuration as a serialized string + //submitTask(String taskConfig) is not exposed in the interface. Therefore cast is necessary. + ((EvaluatorContext)jactiveContext).submitTask(taskConfigurationString); } public String getEvaluatorDescriptorSring() { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/92806d0d/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java index 6272046..482cd16 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java @@ -18,6 +18,7 @@ */ package org.apache.reef.javabridge; +import org.apache.reef.runtime.common.driver.evaluator.AllocatedEvaluatorImpl; import org.apache.reef.driver.evaluator.AllocatedEvaluator; import org.apache.reef.tang.ClassHierarchy; import org.apache.reef.tang.Configuration; @@ -70,13 +71,16 @@ public final class AllocatedEvaluatorBridge extends NativeBridge { final Configuration taskConfiguration; try { contextConfiguration = serializer.fromString(contextConfigurationString, clrClassHierarchy); - taskConfiguration = serializer.fromString(taskConfigurationString, clrClassHierarchy); } catch (final Exception e) { final String message = "Unable to de-serialize CLR context or task configurations using class hierarchy."; LOG.log(Level.SEVERE, message, e); throw new RuntimeException(message, e); } - jallocatedEvaluator.submitContextAndTask(contextConfiguration, taskConfiguration); + + //When submit over the bridge, we would keep the task configuration as a serialized string. + //submitContextAndTask(final Configuration contextConfiguration, + //final String taskConfiguration) is not exposed in the interface. Therefore cast is necessary. + ((AllocatedEvaluatorImpl)jallocatedEvaluator).submitContextAndTask(contextConfiguration, taskConfigurationString); } /** @@ -150,14 +154,18 @@ public final class AllocatedEvaluatorBridge extends NativeBridge { try { contextConfiguration = serializer.fromString(contextConfigurationString, clrClassHierarchy); servicetConfiguration = serializer.fromString(serviceConfigurationString, clrClassHierarchy); - taskConfiguration = serializer.fromString(taskConfigurationString, clrClassHierarchy); } catch (final Exception e) { final String message = "Unable to de-serialize CLR context or service or task configurations using class hierarchy."; LOG.log(Level.SEVERE, message, e); throw new RuntimeException(message, e); } - jallocatedEvaluator.submitContextAndServiceAndTask(contextConfiguration, servicetConfiguration, taskConfiguration); + + //When submit over the bridge, we would keep the task configuration as a serialized string. + //submitContextAndServiceAndTask(final Configuration contextConfiguration, final Configuration serviceConfiguration, + //final String taskConfiguration) is not exposed in the interface. Therefore cast is necessary. + ((AllocatedEvaluatorImpl)jallocatedEvaluator) + .submitContextAndServiceAndTask(contextConfiguration, servicetConfiguration, taskConfigurationString); } /** http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/92806d0d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java index 552f84a..ee0232d 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java @@ -124,7 +124,10 @@ public final class EvaluatorContext implements ActiveContext { @Override public synchronized void submitTask(final Configuration taskConf) { + submitTask(this.configurationSerializer.toString(taskConf)); + } + public synchronized void submitTask(final String taskConf) { if (this.isClosed) { throw new RuntimeException("Active context already closed"); } @@ -137,7 +140,7 @@ public final class EvaluatorContext implements ActiveContext { .setStartTask( EvaluatorRuntimeProtocol.StartTaskProto.newBuilder() .setContextId(this.contextIdentifier) - .setConfiguration(this.configurationSerializer.toString(taskConf)) + .setConfiguration(taskConf) .build()) .build(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/92806d0d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java index 5dc0502..31b2ec7 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java @@ -21,13 +21,7 @@ package org.apache.reef.runtime.common.driver.evaluator; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.context.ContextConfiguration; -import org.apache.reef.driver.evaluator.AllocatedEvaluator; -import org.apache.reef.driver.evaluator.CLRProcessFactory; -import org.apache.reef.driver.evaluator.EvaluatorDescriptor; -import org.apache.reef.driver.evaluator.EvaluatorType; -import org.apache.reef.driver.evaluator.EvaluatorProcess; -import org.apache.reef.driver.evaluator.JVMProcessFactory; -import org.apache.reef.driver.evaluator.CLRProcess; +import org.apache.reef.driver.evaluator.*; import org.apache.reef.runtime.common.driver.api.ResourceLaunchEventImpl; import org.apache.reef.runtime.common.evaluator.EvaluatorConfiguration; import org.apache.reef.tang.Configuration; @@ -52,7 +46,7 @@ import java.util.logging.Logger; */ @DriverSide @Private -final class AllocatedEvaluatorImpl implements AllocatedEvaluator { +public final class AllocatedEvaluatorImpl implements AllocatedEvaluator { private static final Logger LOG = Logger.getLogger(AllocatedEvaluatorImpl.class.getName()); @@ -112,6 +106,13 @@ final class AllocatedEvaluatorImpl implements AllocatedEvaluator { } + public void submitTask(final String taskConfiguration) { + final Configuration contextConfiguration = ContextConfiguration.CONF + .set(ContextConfiguration.IDENTIFIER, "RootContext_" + this.getId()) + .build(); + this.submitContextAndTask(contextConfiguration, taskConfiguration); + } + @Override public EvaluatorDescriptor getEvaluatorDescriptor() { return this.evaluatorManager.getEvaluatorDescriptor(); @@ -120,13 +121,13 @@ final class AllocatedEvaluatorImpl implements AllocatedEvaluator { @Override public void submitContext(final Configuration contextConfiguration) { - launch(contextConfiguration, Optional.<Configuration>empty(), Optional.<Configuration>empty()); + launchWithTaskString(contextConfiguration, Optional.<Configuration>empty(), Optional.<String>empty()); } @Override public void submitContextAndService(final Configuration contextConfiguration, final Configuration serviceConfiguration) { - launch(contextConfiguration, Optional.of(serviceConfiguration), Optional.<Configuration>empty()); + launchWithTaskString(contextConfiguration, Optional.of(serviceConfiguration), Optional.<String>empty()); } @Override @@ -135,6 +136,12 @@ final class AllocatedEvaluatorImpl implements AllocatedEvaluator { launch(contextConfiguration, Optional.<Configuration>empty(), Optional.of(taskConfiguration)); } + public void submitContextAndTask(final Configuration contextConfiguration, + final String taskConfiguration) { + launchWithTaskString(contextConfiguration, Optional.<Configuration>empty(), Optional.of(taskConfiguration)); + } + + @Override public void submitContextAndServiceAndTask(final Configuration contextConfiguration, final Configuration serviceConfiguration, @@ -142,6 +149,12 @@ final class AllocatedEvaluatorImpl implements AllocatedEvaluator { launch(contextConfiguration, Optional.of(serviceConfiguration), Optional.of(taskConfiguration)); } + public void submitContextAndServiceAndTask(final Configuration contextConfiguration, + final Configuration serviceConfiguration, + final String taskConfiguration) { + launchWithTaskString(contextConfiguration, Optional.of(serviceConfiguration), Optional.of(taskConfiguration)); + } + @Override @Deprecated public void setType(final EvaluatorType type) { @@ -173,26 +186,37 @@ final class AllocatedEvaluatorImpl implements AllocatedEvaluator { private void launch(final Configuration contextConfiguration, final Optional<Configuration> serviceConfiguration, final Optional<Configuration> taskConfiguration) { + launchWithTaskString(contextConfiguration, serviceConfiguration, + Optional.of(this.configurationSerializer.toString(taskConfiguration.get()))); + } + + private void launchWithTaskString(final Configuration contextConfiguration, + final Optional<Configuration> serviceConfiguration, + final Optional<String> taskConfiguration) { try (final LoggingScope lb = loggingScopeFactory.evaluatorLaunch(this.getId())) { final Configuration evaluatorConfiguration = makeEvaluatorConfiguration(contextConfiguration, serviceConfiguration, taskConfiguration); - final ResourceLaunchEventImpl.Builder rbuilder = - ResourceLaunchEventImpl.newBuilder() - .setIdentifier(this.evaluatorManager.getId()) - .setRemoteId(this.remoteID) - .setEvaluatorConf(evaluatorConfiguration) - .addFiles(this.files) - .addLibraries(this.libraries); - - rbuilder.setProcess(this.evaluatorManager.getEvaluatorDescriptor().getProcess()); - this.evaluatorManager.onResourceLaunch(rbuilder.build()); + resourceBuildAndLaunch(evaluatorConfiguration); } } + private void resourceBuildAndLaunch(final Configuration evaluatorConfiguration) { + final ResourceLaunchEventImpl.Builder rbuilder = + ResourceLaunchEventImpl.newBuilder() + .setIdentifier(this.evaluatorManager.getId()) + .setRemoteId(this.remoteID) + .setEvaluatorConf(evaluatorConfiguration) + .addFiles(this.files) + .addLibraries(this.libraries); + + rbuilder.setProcess(this.evaluatorManager.getEvaluatorDescriptor().getProcess()); + this.evaluatorManager.onResourceLaunch(rbuilder.build()); + } + private Configuration makeEvaluatorConfiguration(final Configuration contextConfiguration, final Optional<Configuration> serviceConfiguration, - final Optional<Configuration> taskConfiguration) { + final Optional<String> taskConfiguration) { final String contextConfigurationString = this.configurationSerializer.toString(contextConfiguration); final ConfigurationModule evaluatorConfigModule; @@ -217,7 +241,7 @@ final class AllocatedEvaluatorImpl implements AllocatedEvaluator { // Add the (optional) task configuration if (taskConfiguration.isPresent()) { - final String taskConfigurationString = this.configurationSerializer.toString(taskConfiguration.get()); + final String taskConfigurationString = taskConfiguration.get(); evaluatorConfigurationModule = evaluatorConfigurationModule .set(EvaluatorConfiguration.TASK_CONFIGURATION, taskConfigurationString); }
