Repository: incubator-reef
Updated Branches:
  refs/heads/master a89fb6859 -> 92806d0df


[REEF-574] Eliminate Task Configuration (de-)serialization for .NET

Currently Task configuration is passed over the bridge as a string. At Java 
side,
it is deserialized into a Configuration object with C# class hierarchy. Before
it is passed over to Evaluator, it is serialized again into a string.

This adds overloading methods to allow the task configuration passing around as
a string without deserialization. And then sent it directly to the Evaluator.
Those methods are added to the impl classes but not interface to avaoid being
exposed as public interface.

JIRA:
  [REEF-574](https://issues.apache.org/jira/browse/REEF-574)

Pull Request:
  This closes #355


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/92806d0d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/92806d0d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/92806d0d

Branch: refs/heads/master
Commit: 92806d0df809c8a05d165422420e716bddc29d9e
Parents: a89fb68
Author: Julia Wang <[email protected]>
Authored: Thu Aug 6 20:34:28 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Sat Aug 15 13:10:48 2015 -0700

----------------------------------------------------------------------
 .../reef/javabridge/ActiveContextBridge.java    | 14 ++--
 .../javabridge/AllocatedEvaluatorBridge.java    | 16 +++--
 .../common/driver/context/EvaluatorContext.java |  5 +-
 .../evaluator/AllocatedEvaluatorImpl.java       | 68 +++++++++++++-------
 4 files changed, 66 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/92806d0d/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
index 47df761..46f44fd 100644
--- 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
+++ 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/ActiveContextBridge.java
@@ -20,8 +20,8 @@ package org.apache.reef.javabridge;
 
 import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.io.naming.Identifiable;
+import org.apache.reef.runtime.common.driver.context.EvaluatorContext;
 import org.apache.reef.tang.ClassHierarchy;
-import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.formats.AvroConfigurationSerializer;
 
 import java.util.logging.Level;
@@ -51,15 +51,9 @@ public final class ActiveContextBridge extends NativeBridge 
implements Identifia
       throw new RuntimeException("empty taskConfigurationString provided.");
     }
 
-    final Configuration taskConfiguration;
-    try {
-      taskConfiguration = serializer.fromString(taskConfigurationString, 
clrClassHierarchy);
-    } catch (final Exception e) {
-      final String message = "Unable to de-serialize CLR  task configurations 
using class hierarchy.";
-      LOG.log(Level.SEVERE, message, e);
-      throw new RuntimeException(message, e);
-    }
-    jactiveContext.submitTask(taskConfiguration);
+    //when submit over the bridge, we would keep the task configuration as a 
serialized string
+    //submitTask(String taskConfig) is not exposed in the interface. Therefore 
cast is necessary.
+    ((EvaluatorContext)jactiveContext).submitTask(taskConfigurationString);
   }
 
   public String getEvaluatorDescriptorSring() {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/92806d0d/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java
 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java
index 6272046..482cd16 100644
--- 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java
+++ 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/AllocatedEvaluatorBridge.java
@@ -18,6 +18,7 @@
  */
 package org.apache.reef.javabridge;
 
+import org.apache.reef.runtime.common.driver.evaluator.AllocatedEvaluatorImpl;
 import org.apache.reef.driver.evaluator.AllocatedEvaluator;
 import org.apache.reef.tang.ClassHierarchy;
 import org.apache.reef.tang.Configuration;
@@ -70,13 +71,16 @@ public final class AllocatedEvaluatorBridge extends 
NativeBridge {
     final Configuration taskConfiguration;
     try {
       contextConfiguration = serializer.fromString(contextConfigurationString, 
clrClassHierarchy);
-      taskConfiguration = serializer.fromString(taskConfigurationString, 
clrClassHierarchy);
     } catch (final Exception e) {
       final String message = "Unable to de-serialize CLR context or task 
configurations using class hierarchy.";
       LOG.log(Level.SEVERE, message, e);
       throw new RuntimeException(message, e);
     }
-    jallocatedEvaluator.submitContextAndTask(contextConfiguration, 
taskConfiguration);
+
+    //When submit over the bridge, we would keep the task configuration as a 
serialized string.
+    //submitContextAndTask(final Configuration contextConfiguration,
+    //final String taskConfiguration) is not exposed in the interface. 
Therefore cast is necessary.
+    
((AllocatedEvaluatorImpl)jallocatedEvaluator).submitContextAndTask(contextConfiguration,
 taskConfigurationString);
   }
 
   /**
@@ -150,14 +154,18 @@ public final class AllocatedEvaluatorBridge extends 
NativeBridge {
     try {
       contextConfiguration = serializer.fromString(contextConfigurationString, 
clrClassHierarchy);
       servicetConfiguration = 
serializer.fromString(serviceConfigurationString, clrClassHierarchy);
-      taskConfiguration = serializer.fromString(taskConfigurationString, 
clrClassHierarchy);
     } catch (final Exception e) {
       final String message =
           "Unable to de-serialize CLR context or service or task 
configurations using class hierarchy.";
       LOG.log(Level.SEVERE, message, e);
       throw new RuntimeException(message, e);
     }
-    jallocatedEvaluator.submitContextAndServiceAndTask(contextConfiguration, 
servicetConfiguration, taskConfiguration);
+
+    //When submit over the bridge, we would keep the task configuration as a 
serialized string.
+    //submitContextAndServiceAndTask(final Configuration contextConfiguration, 
final Configuration serviceConfiguration,
+    //final String taskConfiguration) is not exposed in the interface. 
Therefore cast is necessary.
+    ((AllocatedEvaluatorImpl)jallocatedEvaluator)
+        .submitContextAndServiceAndTask(contextConfiguration, 
servicetConfiguration, taskConfigurationString);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/92806d0d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
index 552f84a..ee0232d 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/context/EvaluatorContext.java
@@ -124,7 +124,10 @@ public final class EvaluatorContext implements 
ActiveContext {
 
   @Override
   public synchronized void submitTask(final Configuration taskConf) {
+    submitTask(this.configurationSerializer.toString(taskConf));
+  }
 
+  public synchronized void submitTask(final String taskConf) {
     if (this.isClosed) {
       throw new RuntimeException("Active context already closed");
     }
@@ -137,7 +140,7 @@ public final class EvaluatorContext implements 
ActiveContext {
             .setStartTask(
                 EvaluatorRuntimeProtocol.StartTaskProto.newBuilder()
                     .setContextId(this.contextIdentifier)
-                    
.setConfiguration(this.configurationSerializer.toString(taskConf))
+                    .setConfiguration(taskConf)
                     .build())
             .build();
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/92806d0d/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java
index 5dc0502..31b2ec7 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/AllocatedEvaluatorImpl.java
@@ -21,13 +21,7 @@ package org.apache.reef.runtime.common.driver.evaluator;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.context.ContextConfiguration;
-import org.apache.reef.driver.evaluator.AllocatedEvaluator;
-import org.apache.reef.driver.evaluator.CLRProcessFactory;
-import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
-import org.apache.reef.driver.evaluator.EvaluatorType;
-import org.apache.reef.driver.evaluator.EvaluatorProcess;
-import org.apache.reef.driver.evaluator.JVMProcessFactory;
-import org.apache.reef.driver.evaluator.CLRProcess;
+import org.apache.reef.driver.evaluator.*;
 import org.apache.reef.runtime.common.driver.api.ResourceLaunchEventImpl;
 import org.apache.reef.runtime.common.evaluator.EvaluatorConfiguration;
 import org.apache.reef.tang.Configuration;
@@ -52,7 +46,7 @@ import java.util.logging.Logger;
  */
 @DriverSide
 @Private
-final class AllocatedEvaluatorImpl implements AllocatedEvaluator {
+public final class AllocatedEvaluatorImpl implements AllocatedEvaluator {
 
   private static final Logger LOG = 
Logger.getLogger(AllocatedEvaluatorImpl.class.getName());
 
@@ -112,6 +106,13 @@ final class AllocatedEvaluatorImpl implements 
AllocatedEvaluator {
 
   }
 
+  public void submitTask(final String taskConfiguration) {
+    final Configuration contextConfiguration = ContextConfiguration.CONF
+        .set(ContextConfiguration.IDENTIFIER, "RootContext_" + this.getId())
+        .build();
+    this.submitContextAndTask(contextConfiguration, taskConfiguration);
+  }
+
   @Override
   public EvaluatorDescriptor getEvaluatorDescriptor() {
     return this.evaluatorManager.getEvaluatorDescriptor();
@@ -120,13 +121,13 @@ final class AllocatedEvaluatorImpl implements 
AllocatedEvaluator {
 
   @Override
   public void submitContext(final Configuration contextConfiguration) {
-    launch(contextConfiguration, Optional.<Configuration>empty(), 
Optional.<Configuration>empty());
+    launchWithTaskString(contextConfiguration, 
Optional.<Configuration>empty(), Optional.<String>empty());
   }
 
   @Override
   public void submitContextAndService(final Configuration contextConfiguration,
                                       final Configuration 
serviceConfiguration) {
-    launch(contextConfiguration, Optional.of(serviceConfiguration), 
Optional.<Configuration>empty());
+    launchWithTaskString(contextConfiguration, 
Optional.of(serviceConfiguration), Optional.<String>empty());
   }
 
   @Override
@@ -135,6 +136,12 @@ final class AllocatedEvaluatorImpl implements 
AllocatedEvaluator {
     launch(contextConfiguration, Optional.<Configuration>empty(), 
Optional.of(taskConfiguration));
   }
 
+  public void submitContextAndTask(final Configuration contextConfiguration,
+                                   final String taskConfiguration) {
+    launchWithTaskString(contextConfiguration, 
Optional.<Configuration>empty(), Optional.of(taskConfiguration));
+  }
+
+
   @Override
   public void submitContextAndServiceAndTask(final Configuration 
contextConfiguration,
                                              final Configuration 
serviceConfiguration,
@@ -142,6 +149,12 @@ final class AllocatedEvaluatorImpl implements 
AllocatedEvaluator {
     launch(contextConfiguration, Optional.of(serviceConfiguration), 
Optional.of(taskConfiguration));
   }
 
+  public void submitContextAndServiceAndTask(final Configuration 
contextConfiguration,
+                                             final Configuration 
serviceConfiguration,
+                                             final String taskConfiguration) {
+    launchWithTaskString(contextConfiguration, 
Optional.of(serviceConfiguration), Optional.of(taskConfiguration));
+  }
+
   @Override
   @Deprecated
   public void setType(final EvaluatorType type) {
@@ -173,26 +186,37 @@ final class AllocatedEvaluatorImpl implements 
AllocatedEvaluator {
   private void launch(final Configuration contextConfiguration,
                       final Optional<Configuration> serviceConfiguration,
                       final Optional<Configuration> taskConfiguration) {
+    launchWithTaskString(contextConfiguration, serviceConfiguration,
+        
Optional.of(this.configurationSerializer.toString(taskConfiguration.get())));
+  }
+
+  private void launchWithTaskString(final Configuration contextConfiguration,
+                      final Optional<Configuration> serviceConfiguration,
+                      final Optional<String> taskConfiguration) {
     try (final LoggingScope lb = 
loggingScopeFactory.evaluatorLaunch(this.getId())) {
       final Configuration evaluatorConfiguration =
           makeEvaluatorConfiguration(contextConfiguration, 
serviceConfiguration, taskConfiguration);
 
-      final ResourceLaunchEventImpl.Builder rbuilder =
-          ResourceLaunchEventImpl.newBuilder()
-              .setIdentifier(this.evaluatorManager.getId())
-              .setRemoteId(this.remoteID)
-              .setEvaluatorConf(evaluatorConfiguration)
-              .addFiles(this.files)
-              .addLibraries(this.libraries);
-
-      
rbuilder.setProcess(this.evaluatorManager.getEvaluatorDescriptor().getProcess());
-      this.evaluatorManager.onResourceLaunch(rbuilder.build());
+      resourceBuildAndLaunch(evaluatorConfiguration);
     }
   }
 
+  private void resourceBuildAndLaunch(final Configuration 
evaluatorConfiguration) {
+    final ResourceLaunchEventImpl.Builder rbuilder =
+        ResourceLaunchEventImpl.newBuilder()
+            .setIdentifier(this.evaluatorManager.getId())
+            .setRemoteId(this.remoteID)
+            .setEvaluatorConf(evaluatorConfiguration)
+            .addFiles(this.files)
+            .addLibraries(this.libraries);
+
+    
rbuilder.setProcess(this.evaluatorManager.getEvaluatorDescriptor().getProcess());
+    this.evaluatorManager.onResourceLaunch(rbuilder.build());
+  }
+
   private Configuration makeEvaluatorConfiguration(final Configuration 
contextConfiguration,
                                                    final 
Optional<Configuration> serviceConfiguration,
-                                                   final 
Optional<Configuration> taskConfiguration) {
+                                                   final Optional<String> 
taskConfiguration) {
 
     final String contextConfigurationString = 
this.configurationSerializer.toString(contextConfiguration);
     final ConfigurationModule evaluatorConfigModule;
@@ -217,7 +241,7 @@ final class AllocatedEvaluatorImpl implements 
AllocatedEvaluator {
 
     // Add the (optional) task configuration
     if (taskConfiguration.isPresent()) {
-      final String taskConfigurationString = 
this.configurationSerializer.toString(taskConfiguration.get());
+      final String taskConfigurationString = taskConfiguration.get();
       evaluatorConfigurationModule = evaluatorConfigurationModule
           .set(EvaluatorConfiguration.TASK_CONFIGURATION, 
taskConfigurationString);
     }

Reply via email to