Repository: incubator-reef
Updated Branches:
refs/heads/master cb31da9f2 -> 803028bee
[REEF-624] Replace all use of DriverBridgeConfiguration with DriverConfiguration
This addressed the issue by
* replacing all usages of DriverBridgeConfiguration in examples and tests
with DriverConfiguration
* adding IObservable<IDriverStarted> and necessary methods to test drivers
* removing unused IStartHnadler from test drivers
JIRA:
[REEF-624](https://issues.apache.org/jira/browse/REEF-624)
Pull Request:
Closes #450
Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/803028be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/803028be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/803028be
Branch: refs/heads/master
Commit: 803028bee1e9bdf75d380e95073669e1bb00f328
Parents: cb31da9
Author: Mariia Mykhailova <[email protected]>
Authored: Mon Aug 31 16:39:05 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Tue Sep 1 17:25:01 2015 -0700
----------------------------------------------------------------------
.../KMeans/KMeansDriverHandlers.cs | 34 ++++++++++----------
.../BroadcastAndReduceClient.cs | 21 ++++++++----
.../PipelineBroadcastAndReduceClient.cs | 24 ++++++++------
.../BroadcastReduceDriver.cs | 27 +++++++++-------
.../PipelinedBroadcastReduceDriver.cs | 27 +++++++++-------
.../ScatterReduceDriver.cs | 27 +++++++++-------
.../Functional/Driver/DriverTestStartHandler.cs | 17 ++++++++--
.../Functional/Driver/TestDriver.cs | 11 ++++---
.../Functional/Group/BroadcastReduceTest.cs | 23 ++++++++-----
.../Group/PipelinedBroadcastReduceTest.cs | 23 ++++++++-----
.../Functional/Group/ScatterReduceTest.cs | 21 ++++++++----
.../Functional/ML/KMeans/TestKMeans.cs | 23 ++++++++-----
.../Functional/Messaging/MessageDriver.cs | 26 ++++++++-------
.../Functional/Messaging/TestTaskMessage.cs | 28 ++++++++++------
14 files changed, 206 insertions(+), 126 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
index ae22701..24e82dd 100644
---
a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
+++
b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs
@@ -47,10 +47,9 @@ using Org.Apache.REEF.Network.Group.Topology;
namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
{
public class KMeansDriverHandlers :
- IStartHandler,
- IObserver<IEvaluatorRequestor>,
IObserver<IAllocatedEvaluator>,
- IObserver<IActiveContext>
+ IObserver<IActiveContext>,
+ IObserver<IDriverStarted>
{
private static readonly Logger _Logger =
Logger.GetLogger(typeof(KMeansDriverHandlers));
private readonly object _lockObj = new object();
@@ -66,12 +65,14 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
private readonly IConfiguration _centroidCodecConf;
private readonly IConfiguration _controlMessageCodecConf;
private readonly IConfiguration _processedResultsCodecConf;
-
+ private readonly IEvaluatorRequestor _evaluatorRequestor;
[Inject]
- public KMeansDriverHandlers([Parameter(typeof(NumPartitions))] int
numPartitions, GroupCommDriver groupCommDriver)
+ public KMeansDriverHandlers(
+ [Parameter(typeof(NumPartitions))] int numPartitions,
+ GroupCommDriver groupCommDriver,
+ IEvaluatorRequestor evaluatorRequestor)
{
- Identifier = "KMeansDriverId";
_executionDirectory =
Path.Combine(Directory.GetCurrentDirectory(),
Constants.KMeansExecutionBaseDirectory,
Guid.NewGuid().ToString("N").Substring(0, 4));
ISet<string> arguments =
ClrHandlerHelper.GetCommandLineArguments();
string dataFile = arguments.Single(a => a.StartsWith("DataFile",
StringComparison.Ordinal)).Split(':')[1];
@@ -84,6 +85,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
_totalEvaluators = numPartitions + 1;
_groupCommDriver = groupCommDriver;
+ _evaluatorRequestor = evaluatorRequestor;
_centroidCodecConf =
CodecToStreamingCodecConfiguration<Centroids>.Conf
.Set(CodecConfiguration<Centroids>.Codec,
GenericType<CentroidsCodec>.Class)
@@ -124,17 +126,6 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
CreateClassHierarchy();
}
- public string Identifier { get; set; }
-
- public void OnNext(IEvaluatorRequestor evalutorRequestor)
- {
- int memory = 2048;
- int core = 1;
- EvaluatorRequest request = new EvaluatorRequest(_totalEvaluators,
memory, core);
-
- evalutorRequestor.Submit(request);
- }
-
public void OnNext(IAllocatedEvaluator allocatedEvaluator)
{
IConfiguration contextConfiguration =
_groupCommDriver.GetContextConfiguration();
@@ -197,6 +188,15 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans
_groupCommTaskStarter.QueueTask(taskConfiguration, activeContext);
}
+ public void OnNext(IDriverStarted value)
+ {
+ int memory = 2048;
+ int core = 1;
+ EvaluatorRequest request = new EvaluatorRequest(_totalEvaluators,
memory, core);
+
+ _evaluatorRequestor.Submit(request);
+ }
+
public void OnError(Exception error)
{
throw new NotImplementedException();
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs
b/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs
index 49a9b45..9d21d8b 100644
---
a/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs
+++
b/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs
@@ -26,6 +26,7 @@ using Org.Apache.REEF.Driver.Bridge;
using Org.Apache.REEF.Network.Examples.GroupCommunication;
using
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks;
using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Naming;
using Org.Apache.REEF.Network.NetworkService;
using Org.Apache.REEF.Tang.Implementations.Configuration;
using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -46,13 +47,12 @@ namespace Org.Apache.REEF.Network.Examples.Client
const int fanOut = 2;
IConfiguration driverConfig =
TangFactory.GetTang().NewConfigurationBuilder(
- DriverBridgeConfiguration.ConfigurationModule
- .Set(DriverBridgeConfiguration.OnDriverStarted,
GenericType<BroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorAllocated,
GenericType<BroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorRequested,
GenericType<BroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorFailed,
GenericType<BroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnContextActive,
GenericType<BroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.CustomTraceLevel,
Level.Info.ToString())
+ DriverConfiguration.ConfigurationModule
+ .Set(DriverConfiguration.OnDriverStarted,
GenericType<BroadcastReduceDriver>.Class)
+ .Set(DriverConfiguration.OnEvaluatorAllocated,
GenericType<BroadcastReduceDriver>.Class)
+ .Set(DriverConfiguration.OnEvaluatorFailed,
GenericType<BroadcastReduceDriver>.Class)
+ .Set(DriverConfiguration.OnContextActive,
GenericType<BroadcastReduceDriver>.Class)
+ .Set(DriverConfiguration.CustomTraceLevel,
Level.Info.ToString())
.Build())
.BindNamedParameter<GroupTestConfig.NumIterations, int>(
GenericType<GroupTestConfig.NumIterations>.Class,
@@ -78,6 +78,13 @@ namespace Org.Apache.REEF.Network.Examples.Client
IConfiguration merged = Configurations.Merge(driverConfig,
groupCommDriverConfig);
+ IConfiguration taskConfig =
TangFactory.GetTang().NewConfigurationBuilder()
+
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies,
string>(typeof(MasterTask).Assembly.GetName().Name)
+
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies,
string>(typeof(NameClient).Assembly.GetName().Name)
+ .Build();
+
+ merged = Configurations.Merge(merged, taskConfig);
+
HashSet<string> appDlls = new HashSet<string>();
appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
appDlls.Add(typeof(ITask).Assembly.GetName().Name);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
b/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
index 16aa698..76aa58b 100644
---
a/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
+++
b/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
@@ -30,6 +30,7 @@ using Org.Apache.REEF.Driver.Bridge;
using Org.Apache.REEF.Network.Examples.GroupCommunication;
using
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks;
using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Naming;
using Org.Apache.REEF.Network.NetworkService;
using Org.Apache.REEF.Tang.Implementations.Configuration;
using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -44,15 +45,12 @@ namespace Org.Apache.REEF.Network.Examples.Client
public void RunPipelineBroadcastAndReduce(bool runOnYarn, int
numTasks, int startingPortNo, int portRange, int arraySize, int chunkSize)
{
IConfiguration driverConfig =
TangFactory.GetTang().NewConfigurationBuilder(
- DriverBridgeConfiguration.ConfigurationModule
- .Set(DriverBridgeConfiguration.OnDriverStarted,
GenericType<PipelinedBroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorAllocated,
- GenericType<PipelinedBroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorRequested,
- GenericType<PipelinedBroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorFailed,
GenericType<PipelinedBroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnContextActive,
GenericType<PipelinedBroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.CustomTraceLevel,
Level.Info.ToString())
+ DriverConfiguration.ConfigurationModule
+ .Set(DriverConfiguration.OnDriverStarted,
GenericType<PipelinedBroadcastReduceDriver>.Class)
+ .Set(DriverConfiguration.OnEvaluatorAllocated,
GenericType<PipelinedBroadcastReduceDriver>.Class)
+ .Set(DriverConfiguration.OnEvaluatorFailed,
GenericType<PipelinedBroadcastReduceDriver>.Class)
+ .Set(DriverConfiguration.OnContextActive,
GenericType<PipelinedBroadcastReduceDriver>.Class)
+ .Set(DriverConfiguration.CustomTraceLevel,
Level.Info.ToString())
.Build())
.BindNamedParameter<GroupTestConfig.NumIterations, int>(
GenericType<GroupTestConfig.NumIterations>.Class,
@@ -84,6 +82,14 @@ namespace Org.Apache.REEF.Network.Examples.Client
IConfiguration merged = Configurations.Merge(driverConfig,
groupCommDriverConfig);
+ IConfiguration taskConfig =
TangFactory.GetTang().NewConfigurationBuilder()
+
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies,
string>(typeof(PipelinedMasterTask).Assembly.GetName().Name)
+
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies,
string>(typeof(PipelinedSlaveTask).Assembly.GetName().Name)
+
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies,
string>(typeof(NameClient).Assembly.GetName().Name)
+ .Build();
+
+ merged = Configurations.Merge(merged, taskConfig);
+
HashSet<string> appDlls = new HashSet<string>();
appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
appDlls.Add(typeof(ITask).Assembly.GetName().Name);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
index 051b23d..84082af 100644
---
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
+++
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
@@ -47,7 +47,11 @@ using Org.Apache.REEF.Wake.Remote.Parameters;
namespace
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks
{
- public class BroadcastReduceDriver : IStartHandler,
IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>,
IObserver<IActiveContext>, IObserver<IFailedEvaluator>
+ public class BroadcastReduceDriver :
+ IObserver<IAllocatedEvaluator>,
+ IObserver<IActiveContext>,
+ IObserver<IFailedEvaluator>,
+ IObserver<IDriverStarted>
{
private static readonly Logger LOGGER =
Logger.GetLogger(typeof(BroadcastReduceDriver));
@@ -59,6 +63,7 @@ namespace
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
private readonly TaskStarter _groupCommTaskStarter;
private readonly IConfiguration _tcpPortProviderConfig;
private readonly IConfiguration _codecConfig;
+ private readonly IEvaluatorRequestor _evaluatorRequestor;
[Inject]
public BroadcastReduceDriver(
@@ -66,12 +71,13 @@ namespace
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
[Parameter(typeof(GroupTestConfig.NumIterations))] int
numIterations,
[Parameter(typeof(GroupTestConfig.StartingPort))] int startingPort,
[Parameter(typeof(GroupTestConfig.PortRange))] int portRange,
- GroupCommDriver groupCommDriver)
+ GroupCommDriver groupCommDriver,
+ IEvaluatorRequestor evaluatorRequestor)
{
- Identifier = "BroadcastStartHandler";
_numEvaluators = numEvaluators;
_numIterations = numIterations;
_groupCommDriver = groupCommDriver;
+ _evaluatorRequestor = evaluatorRequestor;
_tcpPortProviderConfig =
TangFactory.GetTang().NewConfigurationBuilder()
.BindNamedParameter<TcpPortRangeStart,
int>(GenericType<TcpPortRangeStart>.Class,
@@ -112,14 +118,6 @@ namespace
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
CreateClassHierarchy();
}
- public string Identifier { get; set; }
-
- public void OnNext(IEvaluatorRequestor evaluatorRequestor)
- {
- EvaluatorRequest request = new EvaluatorRequest(_numEvaluators,
512, 2, "WonderlandRack", "BroadcastEvaluator");
- evaluatorRequestor.Submit(request);
- }
-
public void OnNext(IAllocatedEvaluator allocatedEvaluator)
{
IConfiguration contextConf =
_groupCommDriver.GetContextConfiguration();
@@ -177,8 +175,15 @@ namespace
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri
{
}
+ public void OnNext(IDriverStarted value)
+ {
+ EvaluatorRequest request = new EvaluatorRequest(_numEvaluators,
512, 2, "WonderlandRack", "BroadcastEvaluator");
+ _evaluatorRequestor.Submit(request);
+ }
+
public void OnError(Exception error)
{
+ throw error;
}
public void OnCompleted()
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
index 6675fb5..32439bd 100644
---
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
+++
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
@@ -47,8 +47,11 @@ using
Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
namespace
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks
{
- public class PipelinedBroadcastReduceDriver : IStartHandler,
IObserver<IEvaluatorRequestor>,
- IObserver<IAllocatedEvaluator>, IObserver<IActiveContext>,
IObserver<IFailedEvaluator>
+ public class PipelinedBroadcastReduceDriver :
+ IObserver<IAllocatedEvaluator>,
+ IObserver<IActiveContext>,
+ IObserver<IFailedEvaluator>,
+ IObserver<IDriverStarted>
{
private static readonly Logger Logger =
Logger.GetLogger(typeof(PipelinedBroadcastReduceDriver));
private readonly int _arraySize;
@@ -59,6 +62,7 @@ namespace
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
private readonly int _numIterations;
private readonly IConfiguration _tcpPortProviderConfig;
private readonly IConfiguration _codecConfig;
+ private readonly IEvaluatorRequestor _evaluatorRequestor;
[Inject]
public PipelinedBroadcastReduceDriver(
@@ -68,14 +72,15 @@ namespace
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
[Parameter(typeof(GroupTestConfig.PortRange))] int portRange,
[Parameter(typeof(GroupTestConfig.ChunkSize))] int chunkSize,
[Parameter(typeof(GroupTestConfig.ArraySize))] int arraySize,
- GroupCommDriver groupCommDriver)
+ GroupCommDriver groupCommDriver,
+ IEvaluatorRequestor evaluatorRequestor)
{
Logger.Log(Level.Info, "entering the driver code " + chunkSize);
- Identifier = "BroadcastStartHandler";
_numEvaluators = numEvaluators;
_numIterations = numIterations;
_arraySize = arraySize;
+ _evaluatorRequestor = evaluatorRequestor;
_tcpPortProviderConfig =
TangFactory.GetTang().NewConfigurationBuilder()
.BindNamedParameter<TcpPortRangeStart,
int>(GenericType<TcpPortRangeStart>.Class,
@@ -122,13 +127,6 @@ namespace
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
CreateClassHierarchy();
}
- public string Identifier { get; set; }
-
- public void OnNext(IEvaluatorRequestor evaluatorRequestor)
- {
- EvaluatorRequest request = new EvaluatorRequest(_numEvaluators,
512, 2, "WonderlandRack", "BroadcastEvaluator");
- evaluatorRequestor.Submit(request);
- }
public void OnNext(IAllocatedEvaluator allocatedEvaluator)
{
@@ -190,6 +188,7 @@ namespace
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
public void OnError(Exception error)
{
+ throw error;
}
public void OnCompleted()
@@ -200,6 +199,12 @@ namespace
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR
{
}
+ public void OnNext(IDriverStarted value)
+ {
+ EvaluatorRequest request = new EvaluatorRequest(_numEvaluators,
512, 2, "WonderlandRack", "BroadcastEvaluator");
+ _evaluatorRequestor.Submit(request);
+ }
+
private void CreateClassHierarchy()
{
var clrDlls = new HashSet<string>();
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
index 10595fd..d2a7ea5 100644
---
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
+++
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
@@ -47,7 +47,11 @@ using
Org.Apache.REEF.Wake.StreamingCodec.CommonStreamingCodecs;
namespace
Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDriverAndTasks
{
- public class ScatterReduceDriver : IStartHandler,
IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>,
IObserver<IActiveContext>, IObserver<IFailedEvaluator>
+ public class ScatterReduceDriver :
+ IObserver<IAllocatedEvaluator>,
+ IObserver<IActiveContext>,
+ IObserver<IFailedEvaluator>,
+ IObserver<IDriverStarted>
{
private static readonly Logger LOGGER =
Logger.GetLogger(typeof(ScatterReduceDriver));
@@ -57,15 +61,17 @@ namespace
Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive
private readonly ICommunicationGroupDriver _commGroup;
private readonly TaskStarter _groupCommTaskStarter;
private readonly IConfiguration _codecConfig;
+ private readonly IEvaluatorRequestor _evaluatorRequestor;
[Inject]
public ScatterReduceDriver(
[Parameter(typeof(GroupTestConfig.NumEvaluators))] int
numEvaluators,
- GroupCommDriver groupCommDriver)
+ GroupCommDriver groupCommDriver,
+ IEvaluatorRequestor evaluatorRequestor)
{
- Identifier = "BroadcastStartHandler";
_numEvaluators = numEvaluators;
_groupCommDriver = groupCommDriver;
+ _evaluatorRequestor = evaluatorRequestor;
_codecConfig = StreamingCodecConfiguration<int>.Conf
.Set(StreamingCodecConfiguration<int>.Codec,
GenericType<IntStreamingCodec>.Class)
@@ -99,14 +105,6 @@ namespace
Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive
CreateClassHierarchy();
}
- public string Identifier { get; set; }
-
- public void OnNext(IEvaluatorRequestor evaluatorRequestor)
- {
- EvaluatorRequest request = new EvaluatorRequest(_numEvaluators,
512, 2, "WonderlandRack", "BroadcastEvaluator");
- evaluatorRequestor.Submit(request);
- }
-
public void OnNext(IAllocatedEvaluator allocatedEvaluator)
{
IConfiguration contextConf =
_groupCommDriver.GetContextConfiguration();
@@ -148,8 +146,15 @@ namespace
Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive
{
}
+ public void OnNext(IDriverStarted value)
+ {
+ EvaluatorRequest request = new EvaluatorRequest(_numEvaluators,
512, 2, "WonderlandRack", "BroadcastEvaluator");
+ _evaluatorRequestor.Submit(request);
+ }
+
public void OnError(Exception error)
{
+ throw error;
}
public void OnCompleted()
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/DriverTestStartHandler.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/DriverTestStartHandler.cs
b/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/DriverTestStartHandler.cs
index c5f6ecd..385658f 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/DriverTestStartHandler.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/DriverTestStartHandler.cs
@@ -17,6 +17,7 @@
* under the License.
*/
+using System;
using Org.Apache.REEF.Driver;
using Org.Apache.REEF.Driver.Bridge;
using Org.Apache.REEF.Tang.Annotations;
@@ -25,7 +26,7 @@ using Org.Apache.REEF.Wake.Time;
namespace Org.Apache.REEF.Tests.Functional.Driver
{
- public class DriverTestStartHandler : IStartHandler
+ public class DriverTestStartHandler : IObserver<IDriverStarted>
{
private static readonly Logger LOGGER =
Logger.GetLogger(typeof(DriverTestStartHandler));
@@ -37,10 +38,20 @@ namespace Org.Apache.REEF.Tests.Functional.Driver
{
_clock = clock;
_httpServerPort = httpServerPort;
- Identifier = "DriverTestStartHandler";
LOGGER.Log(Level.Info, "Http Server port number: " +
httpServerPort.PortNumber);
}
- public string Identifier { get; set; }
+ public void OnNext(IDriverStarted value)
+ {
+ }
+
+ public void OnError(Exception error)
+ {
+ throw error;
+ }
+
+ public void OnCompleted()
+ {
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/TestDriver.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/TestDriver.cs
b/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/TestDriver.cs
index 18aa59b..3de3ed2 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/TestDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Driver/TestDriver.cs
@@ -19,6 +19,7 @@
using System.Collections.Generic;
using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Driver;
using Org.Apache.REEF.Driver.Bridge;
using Org.Apache.REEF.Driver.Defaults;
using Org.Apache.REEF.Tang.Interface;
@@ -46,15 +47,15 @@ namespace Org.Apache.REEF.Tests.Functional.Driver
/// This is to test DriverTestStartHandler. No evaluator and tasks are
involked.
/// </summary>
[TestMethod, Priority(1), TestCategory("FunctionalGated")]
- [Description("Test DriverTestStartHandler. No evaluator and tasks are
involked")]
+ [Description("Test DriverTestStartHandler. No evaluator and tasks are
invoked")]
[DeploymentItem(@".")]
[Timeout(180 * 1000)]
public void TestDriverStart()
{
- IConfiguration driverConfig =
DriverBridgeConfiguration.ConfigurationModule
- .Set(DriverBridgeConfiguration.OnDriverStarted,
GenericType<DriverTestStartHandler>.Class)
- .Set(DriverBridgeConfiguration.CustomTraceListeners,
GenericType<DefaultCustomTraceListener>.Class)
- .Set(DriverBridgeConfiguration.CustomTraceLevel,
Level.Info.ToString())
+ IConfiguration driverConfig =
DriverConfiguration.ConfigurationModule
+ .Set(DriverConfiguration.OnDriverStarted,
GenericType<DriverTestStartHandler>.Class)
+ .Set(DriverConfiguration.CustomTraceListeners,
GenericType<DefaultCustomTraceListener>.Class)
+ .Set(DriverConfiguration.CustomTraceLevel, Level.Info.ToString())
.Build();
HashSet<string> appDlls = new HashSet<string>();
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs
b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs
index 7dc3423..6150d41 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/BroadcastReduceTest.cs
@@ -27,6 +27,7 @@ using Org.Apache.REEF.Driver.Bridge;
using Org.Apache.REEF.Network.Examples.GroupCommunication;
using
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks;
using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Naming;
using Org.Apache.REEF.Network.NetworkService;
using Org.Apache.REEF.Tang.Implementations.Configuration;
using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -71,13 +72,12 @@ namespace Org.Apache.REEF.Tests.Functional.Group
public void TestBroadcastAndReduce(bool runOnYarn, int numTasks)
{
IConfiguration driverConfig =
TangFactory.GetTang().NewConfigurationBuilder(
- DriverBridgeConfiguration.ConfigurationModule
- .Set(DriverBridgeConfiguration.OnDriverStarted,
GenericType<BroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorAllocated,
GenericType<BroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorRequested,
GenericType<BroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorFailed,
GenericType<BroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnContextActive,
GenericType<BroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.CustomTraceLevel,
Level.Info.ToString())
+ DriverConfiguration.ConfigurationModule
+ .Set(DriverConfiguration.OnDriverStarted,
GenericType<BroadcastReduceDriver>.Class)
+ .Set(DriverConfiguration.OnEvaluatorAllocated,
GenericType<BroadcastReduceDriver>.Class)
+ .Set(DriverConfiguration.OnEvaluatorFailed,
GenericType<BroadcastReduceDriver>.Class)
+ .Set(DriverConfiguration.OnContextActive,
GenericType<BroadcastReduceDriver>.Class)
+ .Set(DriverConfiguration.CustomTraceLevel,
Level.Info.ToString())
.Build())
.BindNamedParameter<GroupTestConfig.NumIterations, int>(
GenericType<GroupTestConfig.NumIterations>.Class,
@@ -96,7 +96,14 @@ namespace Org.Apache.REEF.Tests.Functional.Group
.Build();
IConfiguration merged = Configurations.Merge(driverConfig,
groupCommDriverConfig);
-
+
+ IConfiguration taskConfig =
TangFactory.GetTang().NewConfigurationBuilder()
+
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies,
string>(typeof(MasterTask).Assembly.GetName().Name)
+
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies,
string>(typeof(NameClient).Assembly.GetName().Name)
+ .Build();
+
+ merged = Configurations.Merge(merged, taskConfig);
+
HashSet<string> appDlls = new HashSet<string>();
appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
appDlls.Add(typeof(ITask).Assembly.GetName().Name);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs
b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs
index fe098a8..583960e 100644
---
a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs
+++
b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/PipelinedBroadcastReduceTest.cs
@@ -27,6 +27,7 @@ using Org.Apache.REEF.Driver.Bridge;
using Org.Apache.REEF.Network.Examples.GroupCommunication;
using
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks;
using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Naming;
using Org.Apache.REEF.Network.NetworkService;
using Org.Apache.REEF.Tang.Implementations.Configuration;
using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -70,13 +71,12 @@ namespace Org.Apache.REEF.Tests.Functional.Group
public void TestBroadcastAndReduce(bool runOnYarn, int numTasks)
{
IConfiguration driverConfig =
TangFactory.GetTang().NewConfigurationBuilder(
- DriverBridgeConfiguration.ConfigurationModule
- .Set(DriverBridgeConfiguration.OnDriverStarted,
GenericType<PipelinedBroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorAllocated,
GenericType<PipelinedBroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorRequested,
GenericType<PipelinedBroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorFailed,
GenericType<PipelinedBroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnContextActive,
GenericType<PipelinedBroadcastReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.CustomTraceLevel,
Level.Info.ToString())
+ DriverConfiguration.ConfigurationModule
+ .Set(DriverConfiguration.OnDriverStarted,
GenericType<PipelinedBroadcastReduceDriver>.Class)
+ .Set(DriverConfiguration.OnEvaluatorAllocated,
GenericType<PipelinedBroadcastReduceDriver>.Class)
+ .Set(DriverConfiguration.OnEvaluatorFailed,
GenericType<PipelinedBroadcastReduceDriver>.Class)
+ .Set(DriverConfiguration.OnContextActive,
GenericType<PipelinedBroadcastReduceDriver>.Class)
+ .Set(DriverConfiguration.CustomTraceLevel,
Level.Info.ToString())
.Build())
.BindNamedParameter<GroupTestConfig.NumIterations, int>(
GenericType<GroupTestConfig.NumIterations>.Class,
@@ -98,7 +98,14 @@ namespace Org.Apache.REEF.Tests.Functional.Group
.Build();
IConfiguration merged = Configurations.Merge(driverConfig,
groupCommDriverConfig);
-
+
+ IConfiguration taskConfig =
TangFactory.GetTang().NewConfigurationBuilder()
+
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies,
string>(typeof(PipelinedMasterTask).Assembly.GetName().Name)
+
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies,
string>(typeof(NameClient).Assembly.GetName().Name)
+ .Build();
+
+ merged = Configurations.Merge(merged, taskConfig);
+
HashSet<string> appDlls = new HashSet<string>();
appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
appDlls.Add(typeof(ITask).Assembly.GetName().Name);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs
b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs
index 868b3a1..5b4e28b 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Group/ScatterReduceTest.cs
@@ -27,6 +27,7 @@ using Org.Apache.REEF.Driver.Bridge;
using Org.Apache.REEF.Network.Examples.GroupCommunication;
using
Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDriverAndTasks;
using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Naming;
using Org.Apache.REEF.Network.NetworkService;
using Org.Apache.REEF.Tang.Implementations.Configuration;
using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -71,13 +72,12 @@ namespace Org.Apache.REEF.Tests.Functional.Group
public void TestScatterAndReduce(bool runOnYarn, int numTasks)
{
IConfiguration driverConfig =
TangFactory.GetTang().NewConfigurationBuilder(
- DriverBridgeConfiguration.ConfigurationModule
- .Set(DriverBridgeConfiguration.OnDriverStarted,
GenericType<ScatterReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorAllocated,
GenericType<ScatterReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorRequested,
GenericType<ScatterReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorFailed,
GenericType<ScatterReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.OnContextActive,
GenericType<ScatterReduceDriver>.Class)
- .Set(DriverBridgeConfiguration.CustomTraceLevel,
Level.Info.ToString())
+ DriverConfiguration.ConfigurationModule
+ .Set(DriverConfiguration.OnDriverStarted,
GenericType<ScatterReduceDriver>.Class)
+ .Set(DriverConfiguration.OnEvaluatorAllocated,
GenericType<ScatterReduceDriver>.Class)
+ .Set(DriverConfiguration.OnEvaluatorFailed,
GenericType<ScatterReduceDriver>.Class)
+ .Set(DriverConfiguration.OnContextActive,
GenericType<ScatterReduceDriver>.Class)
+ .Set(DriverConfiguration.CustomTraceLevel,
Level.Info.ToString())
.Build())
.BindNamedParameter<GroupTestConfig.NumEvaluators, int>(
GenericType<GroupTestConfig.NumEvaluators>.Class,
@@ -94,6 +94,13 @@ namespace Org.Apache.REEF.Tests.Functional.Group
IConfiguration merged = Configurations.Merge(driverConfig,
groupCommDriverConfig);
+ IConfiguration taskConfig =
TangFactory.GetTang().NewConfigurationBuilder()
+
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies,
string>(typeof(MasterTask).Assembly.GetName().Name)
+
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies,
string>(typeof(NameClient).Assembly.GetName().Name)
+ .Build();
+
+ merged = Configurations.Merge(merged, taskConfig);
+
HashSet<string> appDlls = new HashSet<string>();
appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
appDlls.Add(typeof(ITask).Assembly.GetName().Name);
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
b/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
index 765944c..87ce4db 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/ML/KMeans/TestKMeans.cs
@@ -27,6 +27,7 @@ using Org.Apache.REEF.Common.Tasks;
using Org.Apache.REEF.Driver.Bridge;
using Org.Apache.REEF.Examples.MachineLearning.KMeans;
using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.Naming;
using Org.Apache.REEF.Network.NetworkService;
using Org.Apache.REEF.Tang.Implementations.Configuration;
using Org.Apache.REEF.Tang.Implementations.Tang;
@@ -149,13 +150,12 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
string Identifier = "KMeansDriverId";
IConfiguration driverConfig =
TangFactory.GetTang().NewConfigurationBuilder(
- DriverBridgeConfiguration.ConfigurationModule
- .Set(DriverBridgeConfiguration.OnDriverStarted,
GenericType<KMeansDriverHandlers>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorAllocated,
GenericType<KMeansDriverHandlers>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorRequested,
GenericType<KMeansDriverHandlers>.Class)
- .Set(DriverBridgeConfiguration.OnContextActive,
GenericType<KMeansDriverHandlers>.Class)
- .Set(DriverBridgeConfiguration.CommandLineArguments,
"DataFile:" + _dataFile)
- .Set(DriverBridgeConfiguration.CustomTraceLevel,
Level.Info.ToString())
+ Org.Apache.REEF.Driver.DriverConfiguration.ConfigurationModule
+
.Set(Org.Apache.REEF.Driver.DriverConfiguration.OnDriverStarted,
GenericType<KMeansDriverHandlers>.Class)
+
.Set(Org.Apache.REEF.Driver.DriverConfiguration.OnEvaluatorAllocated,
GenericType<KMeansDriverHandlers>.Class)
+
.Set(Org.Apache.REEF.Driver.DriverConfiguration.OnContextActive,
GenericType<KMeansDriverHandlers>.Class)
+
.Set(Org.Apache.REEF.Driver.DriverConfiguration.CommandLineArguments,
"DataFile:" + _dataFile)
+
.Set(Org.Apache.REEF.Driver.DriverConfiguration.CustomTraceLevel,
Level.Info.ToString())
.Build())
.BindIntNamedParam<NumPartitions>(Partitions.ToString())
.Build();
@@ -168,7 +168,14 @@ namespace Org.Apache.REEF.Tests.Functional.ML.KMeans
.BindIntNamedParam<GroupCommConfigurationOptions.NumberOfTasks>(totalEvaluators.ToString())
.Build();
- return Configurations.Merge(driverConfig,
groupCommunicationDriverConfig);
+ IConfiguration merged = Configurations.Merge(driverConfig,
groupCommunicationDriverConfig);
+
+ IConfiguration taskConfig =
TangFactory.GetTang().NewConfigurationBuilder()
+
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies,
string>(typeof(KMeansMasterTask).Assembly.GetName().Name)
+
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies,
string>(typeof(NameClient).Assembly.GetName().Name)
+ .Build();
+
+ return Configurations.Merge(merged, taskConfig);
}
private HashSet<string> AssembliesToCopy()
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs
b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs
index a08712b..2b9e532 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/MessageDriver.cs
@@ -36,23 +36,25 @@ using IRunningTask =
Org.Apache.REEF.Driver.Task.IRunningTask;
namespace Org.Apache.REEF.Tests.Functional.Messaging
{
- public class MessageDriver : IStartHandler,
IObserver<IAllocatedEvaluator>, IObserver<IEvaluatorRequestor>,
IObserver<ITaskMessage>, IObserver<IRunningTask>
+ public class MessageDriver :
+ IObserver<IAllocatedEvaluator>,
+ IObserver<ITaskMessage>,
+ IObserver<IRunningTask>,
+ IObserver<IDriverStarted>
{
public const int NumerOfEvaluator = 1;
public const string Message = "MESSAGE::DRIVER";
private static readonly Logger LOGGER =
Logger.GetLogger(typeof(MessageDriver));
+ private readonly IEvaluatorRequestor _evaluatorRequestor;
[Inject]
- public MessageDriver()
+ public MessageDriver(IEvaluatorRequestor evaluatorRequestor)
{
CreateClassHierarchy();
- Identifier = "TaskMessagingStartHandler";
+ _evaluatorRequestor = evaluatorRequestor;
}
-
- public string Identifier { get; set; }
-
public void OnNext(IAllocatedEvaluator eval)
{
string taskId = "Task_" + eval.Id;
@@ -71,12 +73,6 @@ namespace Org.Apache.REEF.Tests.Functional.Messaging
eval.SubmitContextAndTask(contextConfiguration, taskConfiguration);
}
- public void OnNext(IEvaluatorRequestor evalutorRequestor)
- {
- EvaluatorRequest request = new EvaluatorRequest(NumerOfEvaluator,
512, 2, "WonderlandRack", "TaskMessagingEvaluator");
- evalutorRequestor.Submit(request);
- }
-
public void OnNext(ITaskMessage taskMessage)
{
string msgReceived =
ByteUtilities.ByteArrarysToString(taskMessage.Message);
@@ -95,6 +91,12 @@ namespace Org.Apache.REEF.Tests.Functional.Messaging
runningTask.Send(ByteUtilities.StringToByteArrays(Message));
}
+ public void OnNext(IDriverStarted value)
+ {
+ EvaluatorRequest request = new EvaluatorRequest(NumerOfEvaluator,
512, 2, "WonderlandRack", "TaskMessagingEvaluator");
+ _evaluatorRequestor.Submit(request);
+ }
+
public void OnCompleted()
{
throw new NotImplementedException();
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/803028be/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/TestTaskMessage.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/TestTaskMessage.cs
b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/TestTaskMessage.cs
index 7a0b59a..063e0ff 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/TestTaskMessage.cs
+++ b/lang/cs/Org.Apache.REEF.Tests/Functional/Messaging/TestTaskMessage.cs
@@ -19,8 +19,12 @@
using System.Collections.Generic;
using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Driver;
using Org.Apache.REEF.Driver.Bridge;
using Org.Apache.REEF.Driver.Defaults;
+using Org.Apache.REEF.Network.Naming;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities.Logging;
@@ -53,21 +57,27 @@ namespace Org.Apache.REEF.Tests.Functional.Messaging
[Timeout(180 * 1000)]
public void TestSendTaskMessage()
{
- IConfiguration driverConfig =
DriverBridgeConfiguration.ConfigurationModule
- .Set(DriverBridgeConfiguration.OnDriverStarted,
GenericType<MessageDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorAllocated,
GenericType<MessageDriver>.Class)
- .Set(DriverBridgeConfiguration.OnTaskMessage,
GenericType<MessageDriver>.Class)
- .Set(DriverBridgeConfiguration.OnTaskRunning,
GenericType<MessageDriver>.Class)
- .Set(DriverBridgeConfiguration.OnEvaluatorRequested,
GenericType<MessageDriver>.Class)
- .Set(DriverBridgeConfiguration.CustomTraceListeners,
GenericType<DefaultCustomTraceListener>.Class)
- .Set(DriverBridgeConfiguration.CustomTraceLevel,
Level.Info.ToString())
+ IConfiguration driverConfig =
DriverConfiguration.ConfigurationModule
+ .Set(DriverConfiguration.OnDriverStarted,
GenericType<MessageDriver>.Class)
+ .Set(DriverConfiguration.OnEvaluatorAllocated,
GenericType<MessageDriver>.Class)
+ .Set(DriverConfiguration.OnTaskMessage,
GenericType<MessageDriver>.Class)
+ .Set(DriverConfiguration.OnTaskRunning,
GenericType<MessageDriver>.Class)
+ .Set(DriverConfiguration.CustomTraceListeners,
GenericType<DefaultCustomTraceListener>.Class)
+ .Set(DriverConfiguration.CustomTraceLevel, Level.Info.ToString())
.Build();
+ IConfiguration taskConfig =
TangFactory.GetTang().NewConfigurationBuilder()
+
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies,
string>(typeof(MessageTask).Assembly.GetName().Name)
+
.BindSetEntry<DriverBridgeConfigurationOptions.SetOfAssemblies,
string>(typeof(NameClient).Assembly.GetName().Name)
+ .Build();
+
+ IConfiguration merged = Configurations.Merge(driverConfig,
taskConfig);
+
HashSet<string> appDlls = new HashSet<string>();
appDlls.Add(typeof(MessageDriver).Assembly.GetName().Name);
appDlls.Add(typeof(MessageTask).Assembly.GetName().Name);
- TestRun(appDlls, driverConfig);
+ TestRun(appDlls, merged);
ValidateSuccessForLocalRuntime(MessageDriver.NumerOfEvaluator);
}
}