Repository: incubator-reef
Updated Branches:
  refs/heads/master cb31da9f2 -> 803028bee


[REEF-624] Replace all use of DriverBridgeConfiguration with DriverConfiguration

This addressed the issue by
  * replacing all usages of DriverBridgeConfiguration in examples and tests
    with DriverConfiguration
  * adding IObservable<IDriverStarted> and necessary methods to test drivers
  * removing unused IStartHnadler from test drivers

JIRA:
  [REEF-624](https://issues.apache.org/jira/browse/REEF-624)

Pull Request:
  Closes #450


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/803028be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/803028be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/803028be

Branch: refs/heads/master
Commit: 803028bee1e9bdf75d380e95073669e1bb00f328
Parents: cb31da9
Author: Mariia Mykhailova <[email protected]>
Authored: Mon Aug 31 16:39:05 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Tue Sep 1 17:25:01 2015 -0700

----------------------------------------------------------------------
 .../KMeans/KMeansDriverHandlers.cs              | 34 ++++++++++----------
 .../BroadcastAndReduceClient.cs                 | 21 ++++++++----
 .../PipelineBroadcastAndReduceClient.cs         | 24 ++++++++------
 .../BroadcastReduceDriver.cs                    | 27 +++++++++-------
 .../PipelinedBroadcastReduceDriver.cs           | 27 +++++++++-------
 .../ScatterReduceDriver.cs                      | 27 +++++++++-------
 .../Functional/Driver/DriverTestStartHandler.cs | 17 ++++++++--
 .../Functional/Driver/TestDriver.cs             | 11 ++++---
 .../Functional/Group/BroadcastReduceTest.cs     | 23 ++++++++-----
 .../Group/PipelinedBroadcastReduceTest.cs       | 23 ++++++++-----
 .../Functional/Group/ScatterReduceTest.cs       | 21 ++++++++----
 .../Functional/ML/KMeans/TestKMeans.cs          | 23 ++++++++-----
 .../Functional/Messaging/MessageDriver.cs       | 26 ++++++++-------
 .../Functional/Messaging/TestTaskMessage.cs     | 28 ++++++++++------
 14 files changed, 206 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
 
b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
index ae22701..24e82dd 100644
--- 
a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
+++ 
b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
@@ -47,10 +47,9 @@ using Org.Apache.REEF.Network.Group.Topology;
 namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
 {
     public class KMeansDriverHandlers : 
-        IStartHandler, 
-        IObserver<IEvaluatorRequestor>,
         IObserver<IAllocatedEvaluator>,
-        IObserver<IActiveContext>
+        IObserver<IActiveContext>, 
+        IObserver<IDriverStarted>
     {
         private static readonly Logger _Logger = 
Logger.GetLogger(typeof(KMeansDriverHandlers));
         private readonly object _lockObj = new object();
@@ -66,12 +65,14 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
         private readonly IConfiguration _centroidCodecConf;
         private readonly IConfiguration _controlMessageCodecConf;
         private readonly IConfiguration _processedResultsCodecConf;
-
+        private readonly IEvaluatorRequestor _evaluatorRequestor;
 
         [Inject]
-        public KMeansDriverHandlers([Parameter(typeof(NumPartitions))] int 
numPartitions, GroupCommDriver groupCommDriver)
+        public KMeansDriverHandlers(
+            [Parameter(typeof(NumPartitions))] int numPartitions, 
+            GroupCommDriver groupCommDriver,
+            IEvaluatorRequestor evaluatorRequestor)
         {
-            Identifier = "KMeansDriverId";
             _executionDirectory = 
Path.Combine(Directory.GetCurrentDirectory(), 
Constants.KMeansExecutionBaseDirectory, 
Guid.NewGuid().ToString("N").Substring(0, 4));
             ISet<string> arguments = 
ClrHandlerHelper.GetCommandLineArguments();
             string dataFile = arguments.Single(a => a.StartsWith("DataFile", 
StringComparison.Ordinal)).Split(':')[1];
@@ -84,6 +85,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
             _totalEvaluators = numPartitions + 1;
 
             _groupCommDriver = groupCommDriver;
+            _evaluatorRequestor = evaluatorRequestor;
 
             _centroidCodecConf = 
CodecToStreamingCodecConfiguration<Centroids>.Conf
                 .Set(CodecConfiguration<Centroids>.Codec, 
GenericType<CentroidsCodec>.Class)
@@ -124,17 +126,6 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
             CreateClassHierarchy();  
         }
 
-        public string Identifier { get; set; }
-
-        public void OnNext(IEvaluatorRequestor evalutorRequestor)
-        {
-            int memory = 2048;
-            int core = 1;
-            EvaluatorRequest request = new EvaluatorRequest(_totalEvaluators, 
memory, core);
-
-            evalutorRequestor.Submit(request);
-        }
-
         public void OnNext(IAllocatedEvaluator allocatedEvaluator)
         {
             IConfiguration contextConfiguration = 
_groupCommDriver.GetContextConfiguration();
@@ -197,6 +188,15 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
             _groupCommTaskStarter.QueueTask(taskConfiguration, activeContext);
         }
 
+        public void OnNext(IDriverStarted value)
+        {
+            int memory = 2048;
+            int core = 1;
+            EvaluatorRequest request = new EvaluatorRequest(_totalEvaluators, 
memory, core);
+
+            _evaluatorRequestor.Submit(request);
+        }
+
         public void OnError(Exception error)
         {
             throw new NotImplementedException();

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs 
b/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs
index 49a9b45..9d21d8b 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs
@@ -26,6 +26,7 @@ using Org.Apache.REEF.Driver.Bridge;
 using Org.Apache.REEF.Network.Examples.GroupCommunication;
 using 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks;
 using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Naming;
 using Org.Apache.REEF.Network.NetworkService;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -46,13 +47,12 @@ namespace Org.Apache.REEF.Network.Examples.Client
             const int fanOut = 2;
 
             IConfiguration driverConfig = 
TangFactory.GetTang().NewConfigurationBuilder(
-                DriverBridgeConfiguration.ConfigurationModule
-                    .Set(DriverBridgeConfiguration.OnDriverStarted, 
GenericType<BroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, 
GenericType<BroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorRequested, 
GenericType<BroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorFailed, 
GenericType<BroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnContextActive, 
GenericType<BroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.CustomTraceLevel, 
Level.Info.ToString())
+                DriverConfiguration.ConfigurationModule
+                    .Set(DriverConfiguration.OnDriverStarted, 
GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverConfiguration.OnEvaluatorAllocated, 
GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverConfiguration.OnEvaluatorFailed, 
GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverConfiguration.OnContextActive, 
GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverConfiguration.CustomTraceLevel, 
Level.Info.ToString())
                     .Build())
                 .BindNamedParameter<GroupTestConfig.NumIterations, int>(
                     GenericType<GroupTestConfig.NumIterations>.Class,
@@ -78,6 +78,13 @@ namespace Org.Apache.REEF.Network.Examples.Client
 
             IConfiguration merged = Configurations.Merge(driverConfig, 
groupCommDriverConfig);
 
+            IConfiguration taskConfig = 
TangFactory.GetTang().NewConfigurationBuilder()
+                
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, 
string>(typeof(MasterTask).Assembly.GetName().Name)
+                
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, 
string>(typeof(NameClient).Assembly.GetName().Name)
+                .Build();
+
+            merged = Configurations.Merge(merged, taskConfig);
+
             HashSet<string> appDlls = new HashSet<string>();
             appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
             appDlls.Add(typeof(ITask).Assembly.GetName().Name);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
index 16aa698..76aa58b 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
@@ -30,6 +30,7 @@ using Org.Apache.REEF.Driver.Bridge;
 using Org.Apache.REEF.Network.Examples.GroupCommunication;
 using 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks;
 using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Naming;
 using Org.Apache.REEF.Network.NetworkService;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -44,15 +45,12 @@ namespace Org.Apache.REEF.Network.Examples.Client
         public void RunPipelineBroadcastAndReduce(bool runOnYarn, int 
numTasks, int startingPortNo, int portRange, int arraySize, int chunkSize)
         {
             IConfiguration driverConfig = 
TangFactory.GetTang().NewConfigurationBuilder(
-                DriverBridgeConfiguration.ConfigurationModule
-                    .Set(DriverBridgeConfiguration.OnDriverStarted, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorAllocated,
-                        GenericType<PipelinedBroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorRequested,
-                        GenericType<PipelinedBroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorFailed, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnContextActive, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.CustomTraceLevel, 
Level.Info.ToString())
+                DriverConfiguration.ConfigurationModule
+                    .Set(DriverConfiguration.OnDriverStarted, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverConfiguration.OnEvaluatorAllocated, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverConfiguration.OnEvaluatorFailed, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverConfiguration.OnContextActive, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverConfiguration.CustomTraceLevel, 
Level.Info.ToString())
                     .Build())
                 .BindNamedParameter<GroupTestConfig.NumIterations, int>(
                     GenericType<GroupTestConfig.NumIterations>.Class,
@@ -84,6 +82,14 @@ namespace Org.Apache.REEF.Network.Examples.Client
 
             IConfiguration merged = Configurations.Merge(driverConfig, 
groupCommDriverConfig);
 
+            IConfiguration taskConfig = 
TangFactory.GetTang().NewConfigurationBuilder()
+                
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, 
string>(typeof(PipelinedMasterTask).Assembly.GetName().Name)
+                
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, 
string>(typeof(PipelinedSlaveTask).Assembly.GetName().Name)
+                
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, 
string>(typeof(NameClient).Assembly.GetName().Name)
+                .Build();
+
+            merged = Configurations.Merge(merged, taskConfig);
+
             HashSet<string> appDlls = new HashSet<string>();
             appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
             appDlls.Add(typeof(ITask).Assembly.GetName().Name);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
index 051b23d..84082af 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
@@ -47,7 +47,11 @@ using Org.Apache.REEF.Wake.Remote.Parameters;
 
 namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks
 {
-    public class BroadcastReduceDriver : IStartHandler, 
IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, 
IObserver<IActiveContext>, IObserver<IFailedEvaluator>
+    public class BroadcastReduceDriver : 
+        IObserver<IAllocatedEvaluator>, 
+        IObserver<IActiveContext>, 
+        IObserver<IFailedEvaluator>, 
+        IObserver<IDriverStarted>
     {
         private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(BroadcastReduceDriver));
 
@@ -59,6 +63,7 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
         private readonly TaskStarter _groupCommTaskStarter;
         private readonly IConfiguration _tcpPortProviderConfig;
         private readonly IConfiguration _codecConfig;
+        private readonly IEvaluatorRequestor _evaluatorRequestor;
 
         [Inject]
         public BroadcastReduceDriver(
@@ -66,12 +71,13 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
             [Parameter(typeof(GroupTestConfig.NumIterations))] int 
numIterations,
             [Parameter(typeof(GroupTestConfig.StartingPort))] int startingPort,
             [Parameter(typeof(GroupTestConfig.PortRange))] int portRange,
-            GroupCommDriver groupCommDriver)
+            GroupCommDriver groupCommDriver,
+            IEvaluatorRequestor evaluatorRequestor)
         {
-            Identifier = "BroadcastStartHandler";
             _numEvaluators = numEvaluators;
             _numIterations = numIterations;
             _groupCommDriver = groupCommDriver;
+            _evaluatorRequestor = evaluatorRequestor;
 
             _tcpPortProviderConfig = 
TangFactory.GetTang().NewConfigurationBuilder()
                 .BindNamedParameter<TcpPortRangeStart, 
int>(GenericType<TcpPortRangeStart>.Class,
@@ -112,14 +118,6 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
             CreateClassHierarchy();
         }
 
-        public string Identifier { get; set; }
-
-        public void OnNext(IEvaluatorRequestor evaluatorRequestor)
-        {
-            EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 
512, 2, "WonderlandRack", "BroadcastEvaluator");
-            evaluatorRequestor.Submit(request);
-        }
-
         public void OnNext(IAllocatedEvaluator allocatedEvaluator)
         {
             IConfiguration contextConf = 
_groupCommDriver.GetContextConfiguration();
@@ -177,8 +175,15 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
         {
         }
 
+        public void OnNext(IDriverStarted value)
+        {
+            EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 
512, 2, "WonderlandRack", "BroadcastEvaluator");
+            _evaluatorRequestor.Submit(request);
+        }
+
         public void OnError(Exception error)
         {
+            throw error;
         }
 
         public void OnCompleted()

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
index 6675fb5..32439bd 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
@@ -47,8 +47,11 @@ using 
Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
 
 namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks
 {
-    public class PipelinedBroadcastReduceDriver : IStartHandler, 
IObserver<IEvaluatorRequestor>,
-        IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>, 
IObserver<IFailedEvaluator>
+    public class PipelinedBroadcastReduceDriver : 
+        IObserver<IAllocatedEvaluator>, 
+        IObserver<IActiveContext>, 
+        IObserver<IFailedEvaluator>, 
+        IObserver<IDriverStarted>
     {
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(PipelinedBroadcastReduceDriver));
         private readonly int _arraySize;
@@ -59,6 +62,7 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
         private readonly int _numIterations;
         private readonly IConfiguration _tcpPortProviderConfig;
         private readonly IConfiguration _codecConfig;
+        private readonly IEvaluatorRequestor _evaluatorRequestor;
 
         [Inject]
         public PipelinedBroadcastReduceDriver(
@@ -68,14 +72,15 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
             [Parameter(typeof(GroupTestConfig.PortRange))] int portRange,
             [Parameter(typeof(GroupTestConfig.ChunkSize))] int chunkSize,
             [Parameter(typeof(GroupTestConfig.ArraySize))] int arraySize,
-            GroupCommDriver groupCommDriver)
+            GroupCommDriver groupCommDriver,
+            IEvaluatorRequestor evaluatorRequestor)
         {
             Logger.Log(Level.Info, "entering the driver code " + chunkSize);
 
-            Identifier = "BroadcastStartHandler";
             _numEvaluators = numEvaluators;
             _numIterations = numIterations;
             _arraySize = arraySize;
+            _evaluatorRequestor = evaluatorRequestor;
 
             _tcpPortProviderConfig = 
TangFactory.GetTang().NewConfigurationBuilder()
                 .BindNamedParameter<TcpPortRangeStart, 
int>(GenericType<TcpPortRangeStart>.Class,
@@ -122,13 +127,6 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
 
             CreateClassHierarchy();
         }
-        public string Identifier { get; set; }
-
-        public void OnNext(IEvaluatorRequestor evaluatorRequestor)
-        {
-            EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 
512, 2, "WonderlandRack", "BroadcastEvaluator");
-            evaluatorRequestor.Submit(request);
-        }
 
         public void OnNext(IAllocatedEvaluator allocatedEvaluator)
         {
@@ -190,6 +188,7 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
 
         public void OnError(Exception error)
         {
+            throw error;
         }
 
         public void OnCompleted()
@@ -200,6 +199,12 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
         {
         }
 
+        public void OnNext(IDriverStarted value)
+        {
+            EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 
512, 2, "WonderlandRack", "BroadcastEvaluator");
+            _evaluatorRequestor.Submit(request);
+        }
+
         private void CreateClassHierarchy()
         {
             var clrDlls = new HashSet<string>();

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
index 10595fd..d2a7ea5 100644
--- 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
@@ -47,7 +47,11 @@ using 
Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
 
 namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDriverAndTasks
 {
-    public class ScatterReduceDriver : IStartHandler, 
IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, 
IObserver<IActiveContext>, IObserver<IFailedEvaluator>
+    public class ScatterReduceDriver : 
+        IObserver<IAllocatedEvaluator>, 
+        IObserver<IActiveContext>, 
+        IObserver<IFailedEvaluator>, 
+        IObserver<IDriverStarted>
     {
         private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(ScatterReduceDriver));
 
@@ -57,15 +61,17 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive
         private readonly ICommunicationGroupDriver _commGroup;
         private readonly TaskStarter _groupCommTaskStarter;
         private readonly IConfiguration _codecConfig;
+        private readonly IEvaluatorRequestor _evaluatorRequestor;
 
         [Inject]
         public ScatterReduceDriver(
             [Parameter(typeof(GroupTestConfig.NumEvaluators))] int 
numEvaluators,
-            GroupCommDriver groupCommDriver)
+            GroupCommDriver groupCommDriver,
+            IEvaluatorRequestor evaluatorRequestor)
         {
-            Identifier = "BroadcastStartHandler";
             _numEvaluators = numEvaluators;
             _groupCommDriver = groupCommDriver;
+            _evaluatorRequestor = evaluatorRequestor;
 
             _codecConfig = StreamingCodecConfiguration<int>.Conf
                .Set(StreamingCodecConfiguration<int>.Codec, 
GenericType<IntStreamingCodec>.Class)
@@ -99,14 +105,6 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive
             CreateClassHierarchy();
         }
 
-        public string Identifier { get; set; }
-
-        public void OnNext(IEvaluatorRequestor evaluatorRequestor)
-        {
-            EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 
512, 2, "WonderlandRack", "BroadcastEvaluator");
-            evaluatorRequestor.Submit(request);
-        }
-
         public void OnNext(IAllocatedEvaluator allocatedEvaluator)
         {
             IConfiguration contextConf = 
_groupCommDriver.GetContextConfiguration();
@@ -148,8 +146,15 @@ namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive
         {
         }
 
+        public void OnNext(IDriverStarted value)
+        {
+            EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 
512, 2, "WonderlandRack", "BroadcastEvaluator");
+            _evaluatorRequestor.Submit(request);
+        }
+
         public void OnError(Exception error)
         {
+            throw error;
         }
 
         public void OnCompleted()

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/DriverTestStartHandler.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/DriverTestStartHandler.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/DriverTestStartHandler.cs
index c5f6ecd..385658f 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/DriverTestStartHandler.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/DriverTestStartHandler.cs
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+using System;
 using Org.Apache.REEF.Driver;
 using Org.Apache.REEF.Driver.Bridge;
 using Org.Apache.REEF.Tang.Annotations;
@@ -25,7 +26,7 @@ using Org.Apache.REEF.Wake.Time;
 
 namespace Org.Apache.REEF.Tests.Functional.Driver
 {
-    public class DriverTestStartHandler : IStartHandler
+    public class DriverTestStartHandler : IObserver<IDriverStarted>
     {
         private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(DriverTestStartHandler));
 
@@ -37,10 +38,20 @@ namespace Org.Apache.REEF.Tests.Functional.Driver
         {
             _clock = clock;
             _httpServerPort = httpServerPort;
-            Identifier = "DriverTestStartHandler";
             LOGGER.Log(Level.Info, "Http Server port number: " + 
httpServerPort.PortNumber);
         }
 
-        public string Identifier { get; set; }
+        public void OnNext(IDriverStarted value)
+        {
+        }
+
+        public void OnError(Exception error)
+        {
+            throw error;
+        }
+
+        public void OnCompleted()
+        {
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/TestDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/TestDriver.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/TestDriver.cs
index 18aa59b..3de3ed2 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/TestDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/TestDriver.cs
@@ -19,6 +19,7 @@
 
 using System.Collections.Generic;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Driver;
 using Org.Apache.REEF.Driver.Bridge;
 using Org.Apache.REEF.Driver.Defaults;
 using Org.Apache.REEF.Tang.Interface;
@@ -46,15 +47,15 @@ namespace Org.Apache.REEF.Tests.Functional.Driver
         /// This is to test DriverTestStartHandler. No evaluator and tasks are 
involked.
         /// </summary>
         [TestMethod, Priority(1), TestCategory("FunctionalGated")]
-        [Description("Test DriverTestStartHandler. No evaluator and tasks are 
involked")]
+        [Description("Test DriverTestStartHandler. No evaluator and tasks are 
invoked")]
         [DeploymentItem(@".")]
         [Timeout(180 * 1000)]
         public void TestDriverStart()
         {
-            IConfiguration driverConfig = 
DriverBridgeConfiguration.ConfigurationModule
-             .Set(DriverBridgeConfiguration.OnDriverStarted, 
GenericType<DriverTestStartHandler>.Class)
-             .Set(DriverBridgeConfiguration.CustomTraceListeners, 
GenericType<DefaultCustomTraceListener>.Class)
-             .Set(DriverBridgeConfiguration.CustomTraceLevel, 
Level.Info.ToString())
+            IConfiguration driverConfig = 
DriverConfiguration.ConfigurationModule
+             .Set(DriverConfiguration.OnDriverStarted, 
GenericType<DriverTestStartHandler>.Class)
+             .Set(DriverConfiguration.CustomTraceListeners, 
GenericType<DefaultCustomTraceListener>.Class)
+             .Set(DriverConfiguration.CustomTraceLevel, Level.Info.ToString())
              .Build();
 
             HashSet<string> appDlls = new HashSet<string>();

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs
index 7dc3423..6150d41 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs
@@ -27,6 +27,7 @@ using Org.Apache.REEF.Driver.Bridge;
 using Org.Apache.REEF.Network.Examples.GroupCommunication;
 using 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks;
 using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Naming;
 using Org.Apache.REEF.Network.NetworkService;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -71,13 +72,12 @@ namespace Org.Apache.REEF.Tests.Functional.Group
         public void TestBroadcastAndReduce(bool runOnYarn, int numTasks)
         {
             IConfiguration driverConfig = 
TangFactory.GetTang().NewConfigurationBuilder(
-                DriverBridgeConfiguration.ConfigurationModule
-                    .Set(DriverBridgeConfiguration.OnDriverStarted, 
GenericType<BroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, 
GenericType<BroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorRequested, 
GenericType<BroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorFailed, 
GenericType<BroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnContextActive, 
GenericType<BroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.CustomTraceLevel, 
Level.Info.ToString())
+                DriverConfiguration.ConfigurationModule
+                    .Set(DriverConfiguration.OnDriverStarted, 
GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverConfiguration.OnEvaluatorAllocated, 
GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverConfiguration.OnEvaluatorFailed, 
GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverConfiguration.OnContextActive, 
GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverConfiguration.CustomTraceLevel, 
Level.Info.ToString())
                     .Build())
                 .BindNamedParameter<GroupTestConfig.NumIterations, int>(
                     GenericType<GroupTestConfig.NumIterations>.Class,
@@ -96,7 +96,14 @@ namespace Org.Apache.REEF.Tests.Functional.Group
                 .Build();
 
             IConfiguration merged = Configurations.Merge(driverConfig, 
groupCommDriverConfig);
-                    
+
+            IConfiguration taskConfig = 
TangFactory.GetTang().NewConfigurationBuilder()
+                
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, 
string>(typeof(MasterTask).Assembly.GetName().Name)
+                
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, 
string>(typeof(NameClient).Assembly.GetName().Name)
+                .Build();
+
+            merged = Configurations.Merge(merged, taskConfig);
+
             HashSet<string> appDlls = new HashSet<string>();
             appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
             appDlls.Add(typeof(ITask).Assembly.GetName().Name);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs
index fe098a8..583960e 100644
--- 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs
@@ -27,6 +27,7 @@ using Org.Apache.REEF.Driver.Bridge;
 using Org.Apache.REEF.Network.Examples.GroupCommunication;
 using 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks;
 using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Naming;
 using Org.Apache.REEF.Network.NetworkService;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -70,13 +71,12 @@ namespace Org.Apache.REEF.Tests.Functional.Group
         public void TestBroadcastAndReduce(bool runOnYarn, int numTasks)
         {
             IConfiguration driverConfig = 
TangFactory.GetTang().NewConfigurationBuilder(
-                DriverBridgeConfiguration.ConfigurationModule
-                    .Set(DriverBridgeConfiguration.OnDriverStarted, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorRequested, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorFailed, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnContextActive, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.CustomTraceLevel, 
Level.Info.ToString())
+                DriverConfiguration.ConfigurationModule
+                    .Set(DriverConfiguration.OnDriverStarted, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverConfiguration.OnEvaluatorAllocated, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverConfiguration.OnEvaluatorFailed, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverConfiguration.OnContextActive, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverConfiguration.CustomTraceLevel, 
Level.Info.ToString())
                     .Build())
                 .BindNamedParameter<GroupTestConfig.NumIterations, int>(
                     GenericType<GroupTestConfig.NumIterations>.Class,
@@ -98,7 +98,14 @@ namespace Org.Apache.REEF.Tests.Functional.Group
                 .Build();
 
             IConfiguration merged = Configurations.Merge(driverConfig, 
groupCommDriverConfig);
-                    
+
+            IConfiguration taskConfig = 
TangFactory.GetTang().NewConfigurationBuilder()
+                
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, 
string>(typeof(PipelinedMasterTask).Assembly.GetName().Name)
+                
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, 
string>(typeof(NameClient).Assembly.GetName().Name)
+                .Build();
+
+            merged = Configurations.Merge(merged, taskConfig);
+
             HashSet<string> appDlls = new HashSet<string>();
             appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
             appDlls.Add(typeof(ITask).Assembly.GetName().Name);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs
index 868b3a1..5b4e28b 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs
@@ -27,6 +27,7 @@ using Org.Apache.REEF.Driver.Bridge;
 using Org.Apache.REEF.Network.Examples.GroupCommunication;
 using 
Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDriverAndTasks;
 using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Naming;
 using Org.Apache.REEF.Network.NetworkService;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -71,13 +72,12 @@ namespace Org.Apache.REEF.Tests.Functional.Group
         public void TestScatterAndReduce(bool runOnYarn, int numTasks)
         {
             IConfiguration driverConfig = 
TangFactory.GetTang().NewConfigurationBuilder(
-                DriverBridgeConfiguration.ConfigurationModule
-                    .Set(DriverBridgeConfiguration.OnDriverStarted, 
GenericType<ScatterReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, 
GenericType<ScatterReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorRequested, 
GenericType<ScatterReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorFailed, 
GenericType<ScatterReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.OnContextActive, 
GenericType<ScatterReduceDriver>.Class)
-                    .Set(DriverBridgeConfiguration.CustomTraceLevel, 
Level.Info.ToString())
+                DriverConfiguration.ConfigurationModule
+                    .Set(DriverConfiguration.OnDriverStarted, 
GenericType<ScatterReduceDriver>.Class)
+                    .Set(DriverConfiguration.OnEvaluatorAllocated, 
GenericType<ScatterReduceDriver>.Class)
+                    .Set(DriverConfiguration.OnEvaluatorFailed, 
GenericType<ScatterReduceDriver>.Class)
+                    .Set(DriverConfiguration.OnContextActive, 
GenericType<ScatterReduceDriver>.Class)
+                    .Set(DriverConfiguration.CustomTraceLevel, 
Level.Info.ToString())
                     .Build())
                 .BindNamedParameter<GroupTestConfig.NumEvaluators, int>(
                     GenericType<GroupTestConfig.NumEvaluators>.Class,
@@ -94,6 +94,13 @@ namespace Org.Apache.REEF.Tests.Functional.Group
 
             IConfiguration merged = Configurations.Merge(driverConfig, 
groupCommDriverConfig);
 
+            IConfiguration taskConfig = 
TangFactory.GetTang().NewConfigurationBuilder()
+                
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, 
string>(typeof(MasterTask).Assembly.GetName().Name)
+                
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, 
string>(typeof(NameClient).Assembly.GetName().Name)
+                .Build();
+
+            merged = Configurations.Merge(merged, taskConfig);
+
             HashSet<string> appDlls = new HashSet<string>();
             appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
             appDlls.Add(typeof(ITask).Assembly.GetName().Name);

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
index 765944c..87ce4db 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
@@ -27,6 +27,7 @@ using Org.Apache.REEF.Common.Tasks;
 using Org.Apache.REEF.Driver.Bridge;
 using Org.Apache.REEF.Examples.MachineLearning.KMeans;
 using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Naming;
 using Org.Apache.REEF.Network.NetworkService;
 using Org.Apache.REEF.Tang.Implementations.Configuration;
 using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -149,13 +150,12 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
             string Identifier = "KMeansDriverId";
 
             IConfiguration driverConfig = 
TangFactory.GetTang().NewConfigurationBuilder(
-                DriverBridgeConfiguration.ConfigurationModule
-                    .Set(DriverBridgeConfiguration.OnDriverStarted, 
GenericType<KMeansDriverHandlers>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, 
GenericType<KMeansDriverHandlers>.Class)
-                    .Set(DriverBridgeConfiguration.OnEvaluatorRequested, 
GenericType<KMeansDriverHandlers>.Class)
-                    .Set(DriverBridgeConfiguration.OnContextActive, 
GenericType<KMeansDriverHandlers>.Class)
-                    .Set(DriverBridgeConfiguration.CommandLineArguments, 
"DataFile:" + _dataFile)
-                    .Set(DriverBridgeConfiguration.CustomTraceLevel, 
Level.Info.ToString())
+                Org.Apache.REEF.Driver.DriverConfiguration.ConfigurationModule
+                    
.Set(Org.Apache.REEF.Driver.DriverConfiguration.OnDriverStarted, 
GenericType<KMeansDriverHandlers>.Class)
+                    
.Set(Org.Apache.REEF.Driver.DriverConfiguration.OnEvaluatorAllocated, 
GenericType<KMeansDriverHandlers>.Class)
+                    
.Set(Org.Apache.REEF.Driver.DriverConfiguration.OnContextActive, 
GenericType<KMeansDriverHandlers>.Class)
+                    
.Set(Org.Apache.REEF.Driver.DriverConfiguration.CommandLineArguments, 
"DataFile:" + _dataFile)
+                    
.Set(Org.Apache.REEF.Driver.DriverConfiguration.CustomTraceLevel, 
Level.Info.ToString())
                     .Build())
                 .BindIntNamedParam<NumPartitions>(Partitions.ToString())
                 .Build();
@@ -168,7 +168,14 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
                 
.BindIntNamedParam<GroupCommConfigurationOptions.NumberOfTasks>(totalEvaluators.ToString())
                 .Build();
 
-            return Configurations.Merge(driverConfig, 
groupCommunicationDriverConfig);
+            IConfiguration merged = Configurations.Merge(driverConfig, 
groupCommunicationDriverConfig);
+
+            IConfiguration taskConfig = 
TangFactory.GetTang().NewConfigurationBuilder()
+                
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, 
string>(typeof(KMeansMasterTask).Assembly.GetName().Name)
+                
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, 
string>(typeof(NameClient).Assembly.GetName().Name)
+                .Build();
+
+            return Configurations.Merge(merged, taskConfig);
         }
 
         private HashSet<string> AssembliesToCopy()

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs
index a08712b..2b9e532 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs
@@ -36,23 +36,25 @@ using IRunningTask = 
Org.Apache.REEF.Driver.Task.IRunningTask;
 
 namespace Org.Apache.REEF.Tests.Functional.Messaging
 {
-    public class MessageDriver : IStartHandler, 
IObserver<IAllocatedEvaluator>, IObserver<IEvaluatorRequestor>, 
IObserver<ITaskMessage>, IObserver<IRunningTask>
+    public class MessageDriver :
+        IObserver<IAllocatedEvaluator>, 
+        IObserver<ITaskMessage>, 
+        IObserver<IRunningTask>, 
+        IObserver<IDriverStarted>
     {
         public const int NumerOfEvaluator = 1;
 
         public const string Message = "MESSAGE::DRIVER";
 
         private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(MessageDriver));
+        private readonly IEvaluatorRequestor _evaluatorRequestor;
 
         [Inject]
-        public MessageDriver()
+        public MessageDriver(IEvaluatorRequestor evaluatorRequestor)
         {
             CreateClassHierarchy();
-            Identifier = "TaskMessagingStartHandler";
+            _evaluatorRequestor = evaluatorRequestor;
         }
-
-        public string Identifier { get; set; }
-
         public void OnNext(IAllocatedEvaluator eval)
         {
             string taskId = "Task_" + eval.Id;
@@ -71,12 +73,6 @@ namespace Org.Apache.REEF.Tests.Functional.Messaging
             eval.SubmitContextAndTask(contextConfiguration, taskConfiguration);
         }
 
-        public void OnNext(IEvaluatorRequestor evalutorRequestor)
-        {
-            EvaluatorRequest request = new EvaluatorRequest(NumerOfEvaluator, 
512, 2, "WonderlandRack", "TaskMessagingEvaluator");
-            evalutorRequestor.Submit(request);
-        }
-
         public void OnNext(ITaskMessage taskMessage)
         {
             string msgReceived = 
ByteUtilities.ByteArrarysToString(taskMessage.Message);
@@ -95,6 +91,12 @@ namespace Org.Apache.REEF.Tests.Functional.Messaging
             runningTask.Send(ByteUtilities.StringToByteArrays(Message));
         }
 
+        public void OnNext(IDriverStarted value)
+        {
+            EvaluatorRequest request = new EvaluatorRequest(NumerOfEvaluator, 
512, 2, "WonderlandRack", "TaskMessagingEvaluator");
+            _evaluatorRequestor.Submit(request);
+        }
+
         public void OnCompleted()
         {
             throw new NotImplementedException();

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/TestTaskMessage.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/TestTaskMessage.cs 
b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/TestTaskMessage.cs
index 7a0b59a..063e0ff 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/TestTaskMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/TestTaskMessage.cs
@@ -19,8 +19,12 @@
 
 using System.Collections.Generic;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Driver;
 using Org.Apache.REEF.Driver.Bridge;
 using Org.Apache.REEF.Driver.Defaults;
+using Org.Apache.REEF.Network.Naming;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
 using Org.Apache.REEF.Utilities.Logging;
@@ -53,21 +57,27 @@ namespace Org.Apache.REEF.Tests.Functional.Messaging
         [Timeout(180 * 1000)]
         public void TestSendTaskMessage()
         {
-            IConfiguration driverConfig = 
DriverBridgeConfiguration.ConfigurationModule
-             .Set(DriverBridgeConfiguration.OnDriverStarted, 
GenericType<MessageDriver>.Class)
-             .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, 
GenericType<MessageDriver>.Class)
-             .Set(DriverBridgeConfiguration.OnTaskMessage, 
GenericType<MessageDriver>.Class)
-             .Set(DriverBridgeConfiguration.OnTaskRunning, 
GenericType<MessageDriver>.Class)
-             .Set(DriverBridgeConfiguration.OnEvaluatorRequested, 
GenericType<MessageDriver>.Class)
-             .Set(DriverBridgeConfiguration.CustomTraceListeners, 
GenericType<DefaultCustomTraceListener>.Class)
-             .Set(DriverBridgeConfiguration.CustomTraceLevel, 
Level.Info.ToString())
+            IConfiguration driverConfig = 
DriverConfiguration.ConfigurationModule
+             .Set(DriverConfiguration.OnDriverStarted, 
GenericType<MessageDriver>.Class)
+             .Set(DriverConfiguration.OnEvaluatorAllocated, 
GenericType<MessageDriver>.Class)
+             .Set(DriverConfiguration.OnTaskMessage, 
GenericType<MessageDriver>.Class)
+             .Set(DriverConfiguration.OnTaskRunning, 
GenericType<MessageDriver>.Class)
+             .Set(DriverConfiguration.CustomTraceListeners, 
GenericType<DefaultCustomTraceListener>.Class)
+             .Set(DriverConfiguration.CustomTraceLevel, Level.Info.ToString())
              .Build();
 
+            IConfiguration taskConfig = 
TangFactory.GetTang().NewConfigurationBuilder()
+                
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, 
string>(typeof(MessageTask).Assembly.GetName().Name)
+                
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies, 
string>(typeof(NameClient).Assembly.GetName().Name)
+                .Build();
+
+            IConfiguration merged = Configurations.Merge(driverConfig, 
taskConfig);
+
             HashSet<string> appDlls = new HashSet<string>();
             appDlls.Add(typeof(MessageDriver).Assembly.GetName().Name);
             appDlls.Add(typeof(MessageTask).Assembly.GetName().Name);
 
-            TestRun(appDlls, driverConfig);
+            TestRun(appDlls, merged);
             ValidateSuccessForLocalRuntime(MessageDriver.NumerOfEvaluator);
         }
     }

Reply via email to