[REEF-260]: Make the GroupCommunicationLayer memory efficient by introducing streaming and building on top of StreamingNetworkService Layer This addressed the issue by * Adding Writable GroupCommunicationMessage. There are two classes one GeneralGroupCommunicationMessage which is independent of message type and is visible to Network service and other type specific communication message visible to operators. * Making the current operators Writable.
JIRA: [REEF-260](https://issues.apache.org/jira/browse/REEF-260) This CLoses #234 Author: Dhruv <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/1d200334 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/1d200334 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/1d200334 Branch: refs/heads/master Commit: 1d2003346d5dd6a4a9cf39c46178bf0f1f755c76 Parents: b3c1517 Author: Dhruv <[email protected]> Authored: Tue Jun 23 12:34:25 2015 -0700 Committer: Julia Wang <[email protected]> Committed: Fri Jun 26 15:14:22 2015 -0700 ---------------------------------------------------------------------- .../KMeans/KMeansDriverHandlers.cs | 22 +- .../Run.cs | 3 +- .../BroadcastReduceDriver.cs | 14 +- .../GroupCommunication/GroupTestConfig.cs | 4 +- .../PipelinedBroadcastReduceDriver.cs | 39 ++- .../ScatterReduceDriver.cs | 11 +- .../GroupCommunicationTests.cs | 253 +++++++++---------- .../GroupCommunicationTreeTopologyTests.cs | 62 +++-- .../GroupCommunication/StreamingCodecTests.cs | 11 +- .../Group/Codec/GcmMessageProto.cs | 70 ----- .../Codec/GroupCommunicationMessageCodec.cs | 70 ----- .../CodecToStreamingCodecConfiguration.cs | 44 ++++ .../Group/Driver/ICommunicationGroupDriver.cs | 5 - .../Driver/Impl/CommunicationGroupDriver.cs | 20 +- .../Impl/GeneralGroupCommunicationMessage.cs | 114 +++++++++ .../Group/Driver/Impl/GroupCommDriver.cs | 41 ++- .../Driver/Impl/GroupCommunicationMessage.cs | 172 ++++++++++--- .../Group/Operators/Impl/BroadcastReceiver.cs | 4 +- .../Group/Operators/Impl/BroadcastSender.cs | 4 +- .../Group/Operators/Impl/ReduceReceiver.cs | 4 +- .../Group/Operators/Impl/ReduceSender.cs | 4 +- .../Group/Operators/Impl/ScatterReceiver.cs | 4 +- .../Group/Operators/Impl/ScatterSender.cs | 4 +- .../Group/Operators/Impl/Sender.cs | 12 +- .../Task/ICommunicationGroupNetworkObserver.cs | 8 +- .../Group/Task/IGroupCommNetworkObserver.cs | 6 +- .../Impl/CommunicationGroupNetworkObserver.cs | 28 +- .../Group/Task/Impl/GroupCommClient.cs | 20 +- .../Group/Task/Impl/GroupCommNetworkObserver.cs | 22 +- .../Group/Task/Impl/NodeStruct.cs | 15 +- .../Group/Task/Impl/OperatorTopology.cs | 155 ++++++------ .../Group/Topology/FlatTopology.cs | 2 - .../Group/Topology/ITopology.cs | 1 - .../Group/Topology/TreeTopology.cs | 6 - .../NetworkService/WritableNsMessage.cs | 8 +- .../Org.Apache.REEF.Network.csproj | 5 +- .../StringStreamingCodec.cs | 82 ++++++ 37 files changed, 772 insertions(+), 577 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs index f4c1ca8..ae22701 100644 --- a/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs +++ b/lang/cs/Org.Apache.REEF.Examples/MachineLearning/KMeans/KMeansDriverHandlers.cs @@ -33,12 +33,10 @@ using Org.Apache.REEF.Examples.MachineLearning.KMeans.codecs; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver; using Org.Apache.REEF.Network.Group.Driver.Impl; -using Org.Apache.REEF.Network.Group.Operators.Impl; using Org.Apache.REEF.Network.Group.Pipelining.Impl; using Org.Apache.REEF.Network.NetworkService; using Org.Apache.REEF.Network.NetworkService.Codec; using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Tang.Formats; using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; @@ -65,6 +63,10 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans private readonly IGroupCommDriver _groupCommDriver; private readonly ICommunicationGroupDriver _commGroup; private readonly TaskStarter _groupCommTaskStarter; + private readonly IConfiguration _centroidCodecConf; + private readonly IConfiguration _controlMessageCodecConf; + private readonly IConfiguration _processedResultsCodecConf; + [Inject] public KMeansDriverHandlers([Parameter(typeof(NumPartitions))] int numPartitions, GroupCommDriver groupCommDriver) @@ -83,7 +85,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans _groupCommDriver = groupCommDriver; - IConfiguration conf1 = CodecConfiguration<Centroids>.Conf + _centroidCodecConf = CodecToStreamingCodecConfiguration<Centroids>.Conf .Set(CodecConfiguration<Centroids>.Codec, GenericType<CentroidsCodec>.Class) .Build(); @@ -91,7 +93,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans .Set(PipelineDataConverterConfiguration<Centroids>.DataConverter, GenericType<DefaultPipelineDataConverter<Centroids>>.Class) .Build(); - IConfiguration conf2 = CodecConfiguration<ControlMessage>.Conf + _controlMessageCodecConf = CodecToStreamingCodecConfiguration<ControlMessage>.Conf .Set(CodecConfiguration<ControlMessage>.Codec, GenericType<ControlMessageCodec>.Class) .Build(); @@ -99,7 +101,7 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans .Set(PipelineDataConverterConfiguration<ControlMessage>.DataConverter, GenericType<DefaultPipelineDataConverter<ControlMessage>>.Class) .Build(); - IConfiguration conf3 = CodecConfiguration<ProcessedResults>.Conf + _processedResultsCodecConf = CodecToStreamingCodecConfiguration<ProcessedResults>.Conf .Set(CodecConfiguration<ProcessedResults>.Codec, GenericType<ProcessedResultsCodec>.Class) .Build(); @@ -112,9 +114,9 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans .Build(); _commGroup = _groupCommDriver.DefaultGroup - .AddBroadcast<Centroids>(Constants.CentroidsBroadcastOperatorName, Constants.MasterTaskId, TopologyTypes.Flat, conf1, dataConverterConfig1) - .AddBroadcast<ControlMessage>(Constants.ControlMessageBroadcastOperatorName, Constants.MasterTaskId, TopologyTypes.Flat, conf2, dataConverterConfig2) - .AddReduce<ProcessedResults>(Constants.MeansReduceOperatorName, Constants.MasterTaskId, TopologyTypes.Flat, conf3, reduceFunctionConfig, dataConverterConfig3) + .AddBroadcast<Centroids>(Constants.CentroidsBroadcastOperatorName, Constants.MasterTaskId, TopologyTypes.Flat, dataConverterConfig1) + .AddBroadcast<ControlMessage>(Constants.ControlMessageBroadcastOperatorName, Constants.MasterTaskId, TopologyTypes.Flat, dataConverterConfig2) + .AddReduce<ProcessedResults>(Constants.MeansReduceOperatorName, Constants.MasterTaskId, TopologyTypes.Flat, reduceFunctionConfig, dataConverterConfig3) .Build(); _groupCommTaskStarter = new TaskStarter(_groupCommDriver, _totalEvaluators); @@ -149,10 +151,10 @@ namespace Org.Apache.REEF.Examples.MachineLearning.KMeans partitionNum = _partitionInex; _partitionInex++; } - } + } IConfiguration gcServiceConfiguration = _groupCommDriver.GetServiceConfiguration(); - + gcServiceConfiguration = Configurations.Merge(gcServiceConfiguration, _centroidCodecConf, _controlMessageCodecConf, _processedResultsCodecConf); IConfiguration commonServiceConfiguration = TangFactory.GetTang().NewConfigurationBuilder(gcServiceConfiguration) .BindNamedParameter<DataPartitionCache.PartitionIndex, int>(GenericType<DataPartitionCache.PartitionIndex>.Class, partitionNum.ToString(CultureInfo.InvariantCulture)) .BindNamedParameter<KMeansConfiguratioinOptions.ExecutionDirectory, string>(GenericType<KMeansConfiguratioinOptions.ExecutionDirectory>.Class, _executionDirectory) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs b/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs index e4d10da..639335b 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs @@ -34,7 +34,8 @@ namespace Org.Apache.REEF.Network.Examples.Client int startPort = 8900; int portRange = 1000; string testToRun = "RunBroadcastAndReduce"; - + testToRun = testToRun.ToLower(); + if (args != null) { if (args.Length > 0) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs index 1d91e63..85ae3c0 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs @@ -27,6 +27,7 @@ using Org.Apache.REEF.Driver; using Org.Apache.REEF.Driver.Bridge; using Org.Apache.REEF.Driver.Context; using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver; using Org.Apache.REEF.Network.Group.Driver.Impl; @@ -57,6 +58,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri private readonly ICommunicationGroupDriver _commGroup; private readonly TaskStarter _groupCommTaskStarter; private readonly IConfiguration _tcpPortProviderConfig; + private readonly IConfiguration _codecConfig; [Inject] public BroadcastReduceDriver( @@ -78,8 +80,9 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri portRange.ToString(CultureInfo.InvariantCulture)) .Build(); - IConfiguration codecConfig = CodecConfiguration<int>.Conf - .Set(CodecConfiguration<int>.Codec, GenericType<IntCodec>.Class) + + _codecConfig = StreamingCodecConfiguration<int>.Conf + .Set(StreamingCodecConfiguration<int>.Codec, GenericType<IntStreamingCodec>.Class) .Build(); IConfiguration reduceFunctionConfig = ReduceFunctionConfiguration<int>.Conf @@ -95,13 +98,11 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri GroupTestConstants.BroadcastOperatorName, GroupTestConstants.MasterTaskId, TopologyTypes.Tree, - codecConfig, dataConverterConfig) .AddReduce<int>( GroupTestConstants.ReduceOperatorName, GroupTestConstants.MasterTaskId, TopologyTypes.Tree, - codecConfig, reduceFunctionConfig, dataConverterConfig) .Build(); @@ -123,12 +124,14 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri { IConfiguration contextConf = _groupCommDriver.GetContextConfiguration(); IConfiguration serviceConf = _groupCommDriver.GetServiceConfiguration(); - serviceConf = Configurations.Merge(serviceConf, _tcpPortProviderConfig); + serviceConf = Configurations.Merge(serviceConf, _tcpPortProviderConfig, _codecConfig); allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf); } public void OnNext(IActiveContext activeContext) { + IConfiguration serviceConfig = _groupCommDriver.GetServiceConfiguration(); + if (_groupCommDriver.IsMasterTaskContext(activeContext)) { // Configure Master Task @@ -190,7 +193,6 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDri clrDlls.Add(typeof(BroadcastReduceDriver).Assembly.GetName().Name); clrDlls.Add(typeof(INameClient).Assembly.GetName().Name); clrDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name); - ClrHandlerHelper.GenerateClassHierarchy(clrDlls); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs index e329788..8a53dc8 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs @@ -43,12 +43,12 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication { } - [NamedParameter("Size of the array")] + [NamedParameter(Documentation = "Size of the array", DefaultValue = "6")] public class ArraySize : Name<int> { } - [NamedParameter("Chunk size for pipelining")] + [NamedParameter(Documentation = "Chunk size for pipelining", DefaultValue = "2")] public class ChunkSize : Name<int> { } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs index 4cf8486..f32de61 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs @@ -27,6 +27,8 @@ using Org.Apache.REEF.Driver; using Org.Apache.REEF.Driver.Bridge; using Org.Apache.REEF.Driver.Context; using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs; +using Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver; using Org.Apache.REEF.Network.Group.Driver.Impl; @@ -56,6 +58,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR private readonly int _numEvaluators; private readonly int _numIterations; private readonly IConfiguration _tcpPortProviderConfig; + private readonly IConfiguration _codecConfig; [Inject] public PipelinedBroadcastReduceDriver( @@ -81,8 +84,8 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR portRange.ToString(CultureInfo.InvariantCulture)) .Build(); - var codecConfig = CodecConfiguration<int[]>.Conf - .Set(CodecConfiguration<int[]>.Codec, GenericType<IntArrayCodec>.Class) + _codecConfig = StreamingCodecConfiguration<int[]>.Conf + .Set(StreamingCodecConfiguration<int[]>.Codec, GenericType<IntArrayStreamingCodec>.Class) .Build(); var reduceFunctionConfig = ReduceFunctionConfiguration<int[]>.Conf @@ -106,13 +109,11 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR GroupTestConstants.BroadcastOperatorName, GroupTestConstants.MasterTaskId, TopologyTypes.Tree, - codecConfig, dataConverterConfig) .AddReduce<int[]>( GroupTestConstants.ReduceOperatorName, GroupTestConstants.MasterTaskId, TopologyTypes.Tree, - codecConfig, reduceFunctionConfig, dataConverterConfig) .Build(); @@ -121,7 +122,21 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR CreateClassHierarchy(); } + public string Identifier { get; set; } + + public void OnNext(IEvaluatorRequestor evaluatorRequestor) + { + EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 512, 2, "WonderlandRack", "BroadcastEvaluator"); + evaluatorRequestor.Submit(request); + } + public void OnNext(IAllocatedEvaluator allocatedEvaluator) + { + IConfiguration contextConf = _groupCommDriver.GetContextConfiguration(); + IConfiguration serviceConf = _groupCommDriver.GetServiceConfiguration(); + serviceConf = Configurations.Merge(serviceConf, _codecConfig, _tcpPortProviderConfig); + allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf); + } public void OnNext(IActiveContext activeContext) { if (_groupCommDriver.IsMasterTaskContext(activeContext)) @@ -173,20 +188,6 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR } } - public void OnNext(IAllocatedEvaluator allocatedEvaluator) - { - var contextConf = _groupCommDriver.GetContextConfiguration(); - var serviceConf = _groupCommDriver.GetServiceConfiguration(); - serviceConf = Configurations.Merge(serviceConf, _tcpPortProviderConfig); - allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf); - } - - public void OnNext(IEvaluatorRequestor evaluatorRequestor) - { - var request = new EvaluatorRequest(_numEvaluators, 512, 2, "WonderlandRack", "BroadcastEvaluator"); - evaluatorRequestor.Submit(request); - } - public void OnError(Exception error) { } @@ -199,8 +200,6 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastR { } - public string Identifier { get; set; } - private void CreateClassHierarchy() { var clrDlls = new HashSet<string>(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs index ba812e3..8a9d3e2 100644 --- a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs @@ -27,6 +27,7 @@ using Org.Apache.REEF.Driver; using Org.Apache.REEF.Driver.Bridge; using Org.Apache.REEF.Driver.Context; using Org.Apache.REEF.Driver.Evaluator; +using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs; using Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks; using Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks; using Org.Apache.REEF.Network.Group.Config; @@ -55,6 +56,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive private readonly IGroupCommDriver _groupCommDriver; private readonly ICommunicationGroupDriver _commGroup; private readonly TaskStarter _groupCommTaskStarter; + private readonly IConfiguration _codecConfig; [Inject] public ScatterReduceDriver( @@ -65,9 +67,9 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive _numEvaluators = numEvaluators; _groupCommDriver = groupCommDriver; - IConfiguration codecConfig = CodecConfiguration<int>.Conf - .Set(CodecConfiguration<int>.Codec, GenericType<IntCodec>.Class) - .Build(); + _codecConfig = StreamingCodecConfiguration<int>.Conf + .Set(StreamingCodecConfiguration<int>.Codec, GenericType<IntStreamingCodec>.Class) + .Build(); IConfiguration reduceFunctionConfig = ReduceFunctionConfiguration<int>.Conf .Set(ReduceFunctionConfiguration<int>.ReduceFunction, GenericType<SumFunction>.Class) @@ -82,13 +84,11 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive GroupTestConstants.ScatterOperatorName, GroupTestConstants.MasterTaskId, TopologyTypes.Tree, - codecConfig, dataConverterConfig) .AddReduce<int>( GroupTestConstants.ReduceOperatorName, GroupTestConstants.MasterTaskId, TopologyTypes.Tree, - codecConfig, reduceFunctionConfig, dataConverterConfig) @@ -111,6 +111,7 @@ namespace Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDrive { IConfiguration contextConf = _groupCommDriver.GetContextConfiguration(); IConfiguration serviceConf = _groupCommDriver.GetServiceConfiguration(); + serviceConf = Configurations.Merge(serviceConf, _codecConfig); allocatedEvaluator.SubmitContextAndService(contextConf, serviceConf); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs index 53b8cb5..f2bec7c 100644 --- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs +++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTests.cs @@ -21,17 +21,17 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Globalization; +using System.IO; using System.Linq; using System.Net; using System.Reactive; -using System.Text; +using System.Threading; +using System.Threading.Tasks; using Microsoft.VisualStudio.TestTools.UnitTesting; using Org.Apache.REEF.Common.Io; using Org.Apache.REEF.Common.Tasks; -using Org.Apache.REEF.Examples.MachineLearning.KMeans; -using Org.Apache.REEF.Examples.MachineLearning.KMeans.codecs; +using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs; using Org.Apache.REEF.Network.Examples.GroupCommunication; -using Org.Apache.REEF.Network.Group.Codec; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver; using Org.Apache.REEF.Network.Group.Driver.Impl; @@ -43,7 +43,7 @@ using Org.Apache.REEF.Network.Group.Task; using Org.Apache.REEF.Network.Group.Topology; using Org.Apache.REEF.Network.Naming; using Org.Apache.REEF.Network.NetworkService; -using Org.Apache.REEF.Network.NetworkService.Codec; +using Org.Apache.REEF.Network.StreamingCodec; using Org.Apache.REEF.Network.Tests.NamingService; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Formats; @@ -53,7 +53,6 @@ using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Wake.Remote; using Org.Apache.REEF.Wake.Remote.Impl; -using Constants = Org.Apache.REEF.Common.Constants; namespace Org.Apache.REEF.Network.Tests.GroupCommunication { @@ -66,34 +65,41 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication using (var nameServer = NameServerTests.BuildNameServer()) { IPEndPoint endpoint = nameServer.LocalEndpoint; - BlockingCollection<GroupCommunicationMessage> messages1 = new BlockingCollection<GroupCommunicationMessage>(); - BlockingCollection<GroupCommunicationMessage> messages2 = new BlockingCollection<GroupCommunicationMessage>(); - var handler1 = Observer.Create<NsMessage<GroupCommunicationMessage>>( - msg => messages1.Add(msg.Data.First())); - var handler2 = Observer.Create<NsMessage<GroupCommunicationMessage>>( - msg => messages2.Add(msg.Data.First())); + BlockingCollection<GeneralGroupCommunicationMessage> messages1 = + new BlockingCollection<GeneralGroupCommunicationMessage>(); + BlockingCollection<GeneralGroupCommunicationMessage> messages2 = + new BlockingCollection<GeneralGroupCommunicationMessage>(); - var networkService1 = BuildNetworkService(endpoint, handler1); - var networkService2 = BuildNetworkService(endpoint, handler2); + var handler1 = + Observer.Create<WritableNsMessage<GeneralGroupCommunicationMessage>>(msg => messages1.Add(msg.Data.First())); + var handler2 = + Observer.Create<WritableNsMessage<GeneralGroupCommunicationMessage>>(msg => messages2.Add(msg.Data.First())); + var networkServiceInjector1 = BuildNetworkServiceInjector(endpoint, handler1); + var networkServiceInjector2 = BuildNetworkServiceInjector(endpoint, handler2); + + var networkService1 = networkServiceInjector1.GetInstance< + WritableNetworkService<GeneralGroupCommunicationMessage>>(); + var networkService2 = networkServiceInjector2.GetInstance< + WritableNetworkService<GeneralGroupCommunicationMessage>>(); networkService1.Register(new StringIdentifier("id1")); networkService2.Register(new StringIdentifier("id2")); - Sender sender1 = new Sender(networkService1, new StringIdentifierFactory()); - Sender sender2 = new Sender(networkService2, new StringIdentifierFactory()); + Sender sender1 = networkServiceInjector1.GetInstance<Sender>(); + Sender sender2 = networkServiceInjector2.GetInstance<Sender>(); - sender1.Send(CreateGcm("abc", "id1", "id2")); - sender1.Send(CreateGcm("def", "id1", "id2")); + sender1.Send(CreateGcmStringType("abc", "id1", "id2")); + sender1.Send(CreateGcmStringType("def", "id1", "id2")); + sender2.Send(CreateGcmStringType("ghi", "id2", "id1")); - sender2.Send(CreateGcm("ghi", "id2", "id1")); + string msg1 = (messages2.Take() as GroupCommunicationMessage<string>).Data[0]; + string msg2 = (messages2.Take() as GroupCommunicationMessage<string>).Data[0]; - string msg1 = Encoding.UTF8.GetString(messages2.Take().Data[0]); - string msg2 = Encoding.UTF8.GetString(messages2.Take().Data[0]); Assert.AreEqual("abc", msg1); Assert.AreEqual("def", msg2); - string msg3 = Encoding.UTF8.GetString(messages1.Take().Data[0]); + string msg3 = (messages1.Take() as GroupCommunicationMessage<string>).Data[0]; Assert.AreEqual("ghi", msg3); } } @@ -115,18 +121,16 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication broadcastOperatorName, masterTaskId, TopologyTypes.Flat, - GetDefaulCodecConfig(), GetDefaulDataConverterConfig()) .AddReduce<int>( reduceOperatorName, masterTaskId, TopologyTypes.Flat, - GetDefaulCodecConfig(), GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig()) .Build(); - var commGroups = CommGroupClients(groupName, numTasks, groupCommunicationDriver, commGroup); + var commGroups = CommGroupClients(groupName, numTasks, groupCommunicationDriver, commGroup, GetDefaultCodecConfig()); //for master task IBroadcastSender<int> broadcastSender = commGroups[0].GetBroadcastSender<int>(broadcastOperatorName); @@ -172,8 +176,8 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication const int numTasks = 3; const int fanOut = 2; - IConfiguration codecConfig = CodecConfiguration<int[]>.Conf - .Set(CodecConfiguration<int[]>.Codec, GenericType<IntArrayCodec>.Class) + IConfiguration codecConfig = StreamingCodecConfiguration<int[]>.Conf + .Set(StreamingCodecConfiguration<int[]>.Codec, GenericType<IntArrayStreamingCodec>.Class) .Build(); IConfiguration reduceFunctionConfig = ReduceFunctionConfiguration<int[]>.Conf @@ -196,18 +200,16 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication broadcastOperatorName, masterTaskId, TopologyTypes.Flat, - codecConfig, dataConverterConfig) .AddReduce<int[]>( reduceOperatorName, masterTaskId, TopologyTypes.Flat, - codecConfig, dataConverterConfig, reduceFunctionConfig) .Build(); - var commGroups = CommGroupClients(groupName, numTasks, groupCommunicationDriver, commGroup); + var commGroups = CommGroupClients(groupName, numTasks, groupCommunicationDriver, commGroup, codecConfig); //for master task IBroadcastSender<int[]> broadcastSender = commGroups[0].GetBroadcastSender<int[]>(broadcastOperatorName); @@ -245,18 +247,16 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication scatterOperatorName, masterTaskId, TopologyTypes.Flat, - GetDefaulCodecConfig(), GetDefaulDataConverterConfig()) .AddReduce<int>( reduceOperatorName, masterTaskId, TopologyTypes.Flat, - GetDefaulCodecConfig(), GetDefaulReduceFuncConfig(), GetDefaulDataConverterConfig()) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig()); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(scatterOperatorName); IReduceReceiver<int> sumReducer = commGroups[0].GetReduceReceiver<int>(reduceOperatorName); @@ -295,43 +295,6 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication } [TestMethod] - public void TestCodecConfig() - { - string groupName = "group1"; - string masterTaskId = "task0"; - string driverId = "Driver Id"; - string meansReduceOperatorName = "MeansReduce"; - string centroidsBroadcastOperatorName = "CentroidsBroadcast"; - string controlMessageBroadcastOperatorName = "ControlMessageBroadcast"; - - - IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, 2, 5); - - IConfiguration conf1 = CodecConfiguration<Centroids>.Conf - .Set(CodecConfiguration<Centroids>.Codec, GenericType<CentroidsCodec>.Class) - .Build(); - - IConfiguration conf2 = CodecConfiguration<ControlMessage>.Conf - .Set(CodecConfiguration<ControlMessage>.Codec, GenericType<ControlMessageCodec>.Class) - .Build(); - - IConfiguration conf3 = CodecConfiguration<ProcessedResults>.Conf - .Set(CodecConfiguration<ProcessedResults>.Codec, GenericType<ProcessedResultsCodec>.Class) - .Build(); - - IConfiguration reduceFunctionConfig = ReduceFunctionConfiguration<ProcessedResults>.Conf - .Set(ReduceFunctionConfiguration<ProcessedResults>.ReduceFunction, GenericType<KMeansMasterTask.AggregateMeans>.Class) - .Build(); - - IConfiguration merged = Configurations.Merge(conf3, reduceFunctionConfig); - var group = groupCommDriver.DefaultGroup - .AddBroadcast<Centroids>(centroidsBroadcastOperatorName, masterTaskId, TopologyTypes.Flat, GetDefaulCodecConfig(), GetDefaulDataConverterConfig()) - .AddBroadcast<ControlMessage>(controlMessageBroadcastOperatorName, masterTaskId, TopologyTypes.Flat, GetDefaulCodecConfig(), GetDefaulDataConverterConfig()) - .AddReduce<ProcessedResults>(meansReduceOperatorName, masterTaskId, TopologyTypes.Flat, GetDefaulCodecConfig(), GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig()) - .Build(); - } - - [TestMethod] public void TestBroadcastOperator() { string groupName = "group1"; @@ -348,7 +311,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication .AddBroadcast(operatorName, masterTaskId) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig()); IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName); IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName); @@ -382,7 +345,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication .AddBroadcast(operatorName, masterTaskId) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig()); IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName); IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName); @@ -416,7 +379,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication .AddBroadcast(operatorName, masterTaskId) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig()); IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName); IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName); @@ -452,10 +415,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); var commGroup = groupCommDriver.DefaultGroup - .AddReduce<int>(operatorName, "task0", TopologyTypes.Flat, GetDefaulCodecConfig(), GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig()) + .AddReduce<int>(operatorName, "task0", TopologyTypes.Flat, GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig()) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig()); IReduceReceiver<int> receiver = commGroups[0].GetReduceReceiver<int>(operatorName); IReduceSender<int> sender1 = commGroups[1].GetReduceSender<int>(operatorName); @@ -487,10 +450,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); var commGroup = groupCommDriver.DefaultGroup - .AddReduce<int>(operatorName, "task0", TopologyTypes.Flat, GetDefaulCodecConfig(),GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig()) + .AddReduce<int>(operatorName, "task0", TopologyTypes.Flat,GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig()) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig()); IReduceReceiver<int> receiver = commGroups[0].GetReduceReceiver<int>(operatorName); IReduceSender<int> sender1 = commGroups[1].GetReduceSender<int>(operatorName); @@ -534,7 +497,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication .AddScatter(operatorName, masterTaskId) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig()); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -573,7 +536,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication .AddScatter(operatorName, masterTaskId) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig()); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -609,10 +572,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); var commGroup = groupCommDriver.DefaultGroup - .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Flat, GetDefaulCodecConfig(), GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig()) + .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Flat, GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig()) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig()); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -659,10 +622,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); var commGroup = groupCommDriver.DefaultGroup - .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Flat, GetDefaulCodecConfig(), GetDefaulDataConverterConfig()) + .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Flat, GetDefaulDataConverterConfig()) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig()); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -706,10 +669,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication IGroupCommDriver groupCommDriver = GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); var commGroup = groupCommDriver.DefaultGroup - .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Flat, GetDefaulCodecConfig(), GetDefaulDataConverterConfig()) + .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Flat, GetDefaulDataConverterConfig()) .Build(); - List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); + List<ICommunicationGroupClient> commGroups = CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig()); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); IScatterReceiver<int> receiver2 = commGroups[2].GetScatterReceiver<int>(operatorName); @@ -744,20 +707,27 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication public void TestConfigurationBroadcastSpec() { FlatTopology<int> topology = new FlatTopology<int>("Operator", "Operator", "task1", "driverid", - new BroadcastOperatorSpec("Sender", GetDefaulCodecConfig(), GetDefaulDataConverterConfig())); + new BroadcastOperatorSpec("Sender", GetDefaultCodecConfig(), GetDefaulDataConverterConfig())); topology.AddTask("task1"); var conf = topology.GetTaskConfiguration("task1"); - ICodec<int> codec = TangFactory.GetTang().NewInjector(conf).GetInstance<ICodec<int>>(); - Assert.AreEqual(3, codec.Decode(codec.Encode(3))); + IStreamingCodec<int> codec = TangFactory.GetTang().NewInjector(conf).GetInstance<IStreamingCodec<int>>(); + + var stream = new MemoryStream(); + IDataWriter writer = new StreamDataWriter(stream); + codec.Write(3, writer); + stream.Position = 0; + IDataReader reader = new StreamDataReader(stream); + int res = codec.Read(reader); + Assert.AreEqual(3, res); } [TestMethod] public void TestConfigurationReduceSpec() { FlatTopology<int> topology = new FlatTopology<int>("Operator", "Group", "task1", "driverid", - new ReduceOperatorSpec("task1", Configurations.Merge(GetDefaulCodecConfig(), GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig()))); + new ReduceOperatorSpec("task1", Configurations.Merge(GetDefaultCodecConfig(), GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig()))); topology.AddTask("task1"); var conf2 = topology.GetTaskConfiguration("task1"); @@ -781,10 +751,41 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication return groupCommDriver; } - public static List<ICommunicationGroupClient> CommGroupClients(string groupName, int numTasks, IGroupCommDriver groupCommDriver, ICommunicationGroupDriver commGroupDriver) + [TestMethod] + public async Task TestCodecToStreamingCodecConfiguration() + { + var config = CodecToStreamingCodecConfiguration<int>.Conf + .Set(CodecToStreamingCodecConfiguration<int>.Codec, GenericType<IntCodec>.Class) + .Build(); + + IStreamingCodec<PipelineMessage<int>> streamingCodec = + TangFactory.GetTang().NewInjector(config).GetInstance<IStreamingCodec<PipelineMessage<int>>>(); + + CancellationToken token = new CancellationToken(); + + int obj = 5; + PipelineMessage<int> message = new PipelineMessage<int>(obj, true); + var stream = new MemoryStream(); + IDataWriter writer = new StreamDataWriter(stream); + streamingCodec.Write(message, writer); + PipelineMessage<int> message1 = new PipelineMessage<int>(obj + 1, false); + await streamingCodec.WriteAsync(message1, writer, token); + + stream.Position = 0; + IDataReader reader = new StreamDataReader(stream); + var res1 = streamingCodec.Read(reader); + var res2 = await streamingCodec.ReadAsync(reader, token); + Assert.AreEqual(obj, res1.Data); + Assert.AreEqual(obj + 1, res2.Data); + Assert.AreEqual(true, res1.IsLast); + Assert.AreEqual(false, res2.IsLast); + } + + public static List<ICommunicationGroupClient> CommGroupClients(string groupName, int numTasks, IGroupCommDriver groupCommDriver, ICommunicationGroupDriver commGroupDriver, IConfiguration userServiceConfig) { List<ICommunicationGroupClient> commGroups = new List<ICommunicationGroupClient>(); IConfiguration serviceConfig = groupCommDriver.GetServiceConfiguration(); + serviceConfig = Configurations.Merge(serviceConfig, userServiceConfig); List<IConfiguration> partialConfigs = new List<IConfiguration>(); for (int i = 0; i < numTasks; i++) @@ -820,24 +821,31 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication return commGroups; } - public static NetworkService<GroupCommunicationMessage> BuildNetworkService( - IPEndPoint nameServerEndpoint, IObserver<NsMessage<GroupCommunicationMessage>> handler) + public static IInjector BuildNetworkServiceInjector( + IPEndPoint nameServerEndpoint, IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>> handler) { - var injector = TangFactory.GetTang().NewInjector(); - var remoteManagerFactory = injector.GetInstance<IRemoteManagerFactory>(); - return new NetworkService<GroupCommunicationMessage>( - 0, handler, new StringIdentifierFactory(), - new GroupCommunicationMessageCodec(), - new NameClient(nameServerEndpoint.Address.ToString(), - nameServerEndpoint.Port), - remoteManagerFactory); - + var config = TangFactory.GetTang().NewConfigurationBuilder() + .BindNamedParameter(typeof (NamingConfigurationOptions.NameServerAddress), + nameServerEndpoint.Address.ToString()) + .BindNamedParameter(typeof (NamingConfigurationOptions.NameServerPort), + nameServerEndpoint.Port.ToString()) + .BindNamedParameter(typeof (NetworkServiceOptions.NetworkServicePort), + (0).ToString(CultureInfo.InvariantCulture)) + .BindImplementation(GenericType<INameClient>.Class, GenericType<NameClient>.Class) + .BindImplementation(GenericType<IStreamingCodec<string>>.Class, GenericType<StringStreamingCodec>.Class) + .Build(); + + var injector = TangFactory.GetTang().NewInjector(config); + injector.BindVolatileInstance( + GenericType<IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>>>.Class, handler); + + return injector; } - private GroupCommunicationMessage CreateGcm(string message, string from, string to) + private GroupCommunicationMessage<string> CreateGcmStringType(string message, string from, string to) { - byte[] data = Encoding.UTF8.GetBytes(message); - return new GroupCommunicationMessage("g1", "op1", from, to, data, MessageType.Data); + var stringCodec = TangFactory.GetTang().NewInjector().GetInstance<StringStreamingCodec>(); + return new GroupCommunicationMessage<string>("g1", "op1", from, to, message, MessageType.Data, stringCodec); } private static void ScatterReceiveReduce(IScatterReceiver<int> receiver, IReduceSender<int> sumSender) @@ -852,21 +860,21 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication return Enumerable.Range(1, n).Sum(); } - private IConfiguration GetDefaulCodecConfig() + private static IConfiguration GetDefaultCodecConfig() { - return CodecConfiguration<int>.Conf - .Set(CodecConfiguration<int>.Codec, GenericType<IntCodec>.Class) + return StreamingCodecConfiguration<int>.Conf + .Set(StreamingCodecConfiguration<int>.Codec, GenericType<IntStreamingCodec>.Class) .Build(); } - private IConfiguration GetDefaulReduceFuncConfig() + private static IConfiguration GetDefaulReduceFuncConfig() { return ReduceFunctionConfiguration<int>.Conf .Set(ReduceFunctionConfiguration<int>.ReduceFunction, GenericType<SumFunction>.Class) .Build(); } - private IConfiguration GetDefaulDataConverterConfig() + private static IConfiguration GetDefaulDataConverterConfig() { return PipelineDataConverterConfiguration<int>.Conf .Set(PipelineDataConverterConfiguration<int>.DataConverter, GenericType<DefaultPipelineDataConverter<int>>.Class) @@ -937,33 +945,6 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication } } - class IntArrayCodec : ICodec<int[]> - { - [Inject] - private IntArrayCodec() - { - } - - public byte[] Encode(int[] obj) - { - byte[] result = new byte[sizeof(Int32) * obj.Length]; - Buffer.BlockCopy(obj, 0, result, 0, result.Length); - return result; - } - - public int[] Decode(byte[] data) - { - if (data.Length % sizeof(Int32) != 0) - { - throw new Exception("error inside integer array decoder, byte array length not a multiple of interger size"); - } - - int[] result = new int[data.Length / sizeof(Int32)]; - Buffer.BlockCopy(data, 0, result, 0, data.Length); - return result; - } - } - class PipelineIntDataConverter : IPipelineDataConverter<int[]> { readonly int _chunkSize; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs index f8af722..88ae9b3 100644 --- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs +++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/GroupCommunicationTreeTopologyTests.cs @@ -21,8 +21,10 @@ using System.Collections.Generic; using System.Globalization; using System.Linq; using Microsoft.VisualStudio.TestTools.UnitTesting; +using Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver; +using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.Group.Operators; using Org.Apache.REEF.Network.Group.Operators.Impl; using Org.Apache.REEF.Network.Group.Pipelining.Impl; @@ -42,7 +44,7 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication public void TestTreeTopology() { TreeTopology<int> topology = new TreeTopology<int>("Operator", "Operator", "task1", "driverid", - new BroadcastOperatorSpec("task1", GetDefaulCodecConfig(), GetDefaulDataConverterConfig()), 2); + new BroadcastOperatorSpec("task1", GetDefaultDataConverterConfig()), 2); for (int i = 1; i < 8; i++) { string taskid = "task" + i; @@ -68,10 +70,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup - .AddReduce<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaulCodecConfig(), GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig()) + .AddReduce<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaultDataConverterConfig(), GetDefaultReduceFuncConfig()) .Build(); - var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig()); IReduceReceiver<int> receiver = commGroups[0].GetReduceReceiver<int>(operatorName); IReduceSender<int> sender1 = commGroups[1].GetReduceSender<int>(operatorName); @@ -124,10 +126,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup - .AddBroadcast<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaulCodecConfig(), GetDefaulDataConverterConfig(), GetDefaulReduceFuncConfig()) + .AddBroadcast<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaultDataConverterConfig(), GetDefaultReduceFuncConfig()) .Build(); - var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig()); IBroadcastSender<int> sender = commGroups[0].GetBroadcastSender<int>(operatorName); IBroadcastReceiver<int> receiver1 = commGroups[1].GetBroadcastReceiver<int>(operatorName); @@ -204,18 +206,16 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication broadcastOperatorName, masterTaskId, TopologyTypes.Tree, - GetDefaulCodecConfig(), - GetDefaulDataConverterConfig()) + GetDefaultDataConverterConfig()) .AddReduce<int>( reduceOperatorName, masterTaskId, TopologyTypes.Tree, - GetDefaulCodecConfig(), - GetDefaulDataConverterConfig(), - GetDefaulReduceFuncConfig()) + GetDefaultDataConverterConfig(), + GetDefaultReduceFuncConfig()) .Build(); - var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig()); //for master task IBroadcastSender<int> broadcastSender = commGroups[0].GetBroadcastSender<int>(broadcastOperatorName); @@ -317,10 +317,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup - .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaulCodecConfig(), GetDefaulDataConverterConfig()) + .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaultDataConverterConfig()) .Build(); - var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig()); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -362,10 +362,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup - .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaulCodecConfig(), GetDefaulDataConverterConfig()) + .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaultDataConverterConfig()) .Build(); - var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig()); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -416,10 +416,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup - .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaulCodecConfig(), GetDefaulDataConverterConfig()) + .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaultDataConverterConfig()) .Build(); - var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig()); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -467,10 +467,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup - .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaulCodecConfig(), GetDefaulDataConverterConfig()) + .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaultDataConverterConfig()) .Build(); - var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig()); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -519,10 +519,10 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication var groupCommDriver = GroupCommunicationTests.GetInstanceOfGroupCommDriver(driverId, masterTaskId, groupName, fanOut, numTasks); ICommunicationGroupDriver commGroup = groupCommDriver.DefaultGroup - .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaulCodecConfig(), GetDefaulDataConverterConfig()) + .AddScatter<int>(operatorName, masterTaskId, TopologyTypes.Tree, GetDefaultDataConverterConfig()) .Build(); - var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig()); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(operatorName); IScatterReceiver<int> receiver1 = commGroups[1].GetScatterReceiver<int>(operatorName); @@ -586,18 +586,16 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication scatterOperatorName, masterTaskId, TopologyTypes.Tree, - GetDefaulCodecConfig(), - GetDefaulDataConverterConfig()) + GetDefaultDataConverterConfig()) .AddReduce<int>( reduceOperatorName, masterTaskId, TopologyTypes.Tree, - GetDefaulCodecConfig(), - GetDefaulDataConverterConfig(), - GetDefaulReduceFuncConfig()) + GetDefaultDataConverterConfig(), + GetDefaultReduceFuncConfig()) .Build(); - var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup); + var commGroups = GroupCommunicationTests.CommGroupClients(groupName, numTasks, groupCommDriver, commGroup, GetDefaultCodecConfig()); IScatterSender<int> sender = commGroups[0].GetScatterSender<int>(scatterOperatorName); IReduceReceiver<int> sumReducer = commGroups[0].GetReduceReceiver<int>(reduceOperatorName); @@ -646,21 +644,21 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication Assert.AreEqual(sum, 6325); } - private IConfiguration GetDefaulCodecConfig() + private IConfiguration GetDefaultCodecConfig() { - return CodecConfiguration<int>.Conf - .Set(CodecConfiguration<int>.Codec, GenericType<IntCodec>.Class) + return StreamingCodecConfiguration<int>.Conf + .Set(StreamingCodecConfiguration<int>.Codec, GenericType<IntStreamingCodec>.Class) .Build(); } - private IConfiguration GetDefaulReduceFuncConfig() + private IConfiguration GetDefaultReduceFuncConfig() { return ReduceFunctionConfiguration<int>.Conf .Set(ReduceFunctionConfiguration<int>.ReduceFunction, GenericType<SumFunction>.Class) .Build(); } - private IConfiguration GetDefaulDataConverterConfig() + private IConfiguration GetDefaultDataConverterConfig() { return PipelineDataConverterConfiguration<int>.Conf .Set(PipelineDataConverterConfiguration<int>.DataConverter, GenericType<DefaultPipelineDataConverter<int>>.Class) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs index 6dea9f1..d4f0647 100644 --- a/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs +++ b/lang/cs/Org.Apache.REEF.Network.Tests/GroupCommunication/StreamingCodecTests.cs @@ -50,12 +50,15 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication IStreamingCodec<double[]> doubleArrCodec = injector.GetInstance<DoubleArrayStreamingCodec>(); IStreamingCodec<float[]> floatArrCodec = injector.GetInstance<FloatArrayStreamingCodec>(); + IStreamingCodec<string> stringCodec = injector.GetInstance<StringStreamingCodec>(); + CancellationToken token = new CancellationToken(); int obj = 5; int[] intArr = {1, 2}; double[] doubleArr = { 1, 2 }; float[] floatArr = { 1, 2 }; + string stringObj = "hello"; var stream = new MemoryStream(); IDataWriter writer = new StreamDataWriter(stream); @@ -71,6 +74,8 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication await doubleArrCodec.WriteAsync(doubleArr, writer, token); floatArrCodec.Write(floatArr, writer); await floatArrCodec.WriteAsync(floatArr, writer, token); + stringCodec.Write(stringObj, writer); + await stringCodec.WriteAsync(stringObj, writer, token); stream.Position = 0; IDataReader reader = new StreamDataReader(stream); @@ -86,13 +91,17 @@ namespace Org.Apache.REEF.Network.Tests.GroupCommunication double[] resArr4 = await doubleArrCodec.ReadAsync(reader, token); float[] resArr5 = floatArrCodec.Read(reader); float[] resArr6 = await floatArrCodec.ReadAsync(reader, token); - + string resArr7 = stringCodec.Read(reader); + string resArr8 = await stringCodec.ReadAsync(reader, token); + Assert.AreEqual(obj, res1); Assert.AreEqual(obj + 1, res2); Assert.AreEqual(obj + 2, res3); Assert.AreEqual(obj + 3, res4); Assert.AreEqual(obj + 4, res5); Assert.AreEqual(obj + 5, res6); + Assert.AreEqual(stringObj, resArr7); + Assert.AreEqual(stringObj, resArr8); for (int i = 0; i < intArr.Length; i++) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Codec/GcmMessageProto.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Codec/GcmMessageProto.cs b/lang/cs/Org.Apache.REEF.Network/Group/Codec/GcmMessageProto.cs deleted file mode 100644 index 755851a..0000000 --- a/lang/cs/Org.Apache.REEF.Network/Group/Codec/GcmMessageProto.cs +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using Org.Apache.REEF.Network.Group.Driver.Impl; -using ProtoBuf; - -namespace Org.Apache.REEF.Network.Group.Codec -{ - [ProtoContract] - public class GcmMessageProto - { - [ProtoMember(1)] - public byte[][] Data { get; set; } - - [ProtoMember(2)] - public string OperatorName { get; set; } - - [ProtoMember(3)] - public string GroupName { get; set; } - - [ProtoMember(4)] - public string Source { get; set; } - - [ProtoMember(5)] - public string Destination { get; set; } - - [ProtoMember(6)] - public MessageType MsgType { get; set; } - - public static GcmMessageProto Create(GroupCommunicationMessage gcm) - { - return new GcmMessageProto() - { - Data = gcm.Data, - OperatorName = gcm.OperatorName, - GroupName = gcm.GroupName, - Source = gcm.Source, - Destination = gcm.Destination, - MsgType = gcm.MsgType, - }; - } - - public GroupCommunicationMessage ToGcm() - { - return new GroupCommunicationMessage( - GroupName, - OperatorName, - Source, - Destination, - Data, - MsgType); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Codec/GroupCommunicationMessageCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Codec/GroupCommunicationMessageCodec.cs b/lang/cs/Org.Apache.REEF.Network/Group/Codec/GroupCommunicationMessageCodec.cs deleted file mode 100644 index 081804f..0000000 --- a/lang/cs/Org.Apache.REEF.Network/Group/Codec/GroupCommunicationMessageCodec.cs +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -using System.IO; -using Org.Apache.REEF.Network.Group.Driver.Impl; -using Org.Apache.REEF.Tang.Annotations; -using Org.Apache.REEF.Wake.Remote; -using ProtoBuf; - -namespace Org.Apache.REEF.Network.Group.Codec -{ - /// <summary> - /// Used to serialize GroupCommunicationMessages. - /// </summary> - public class GroupCommunicationMessageCodec : ICodec<GroupCommunicationMessage> - { - /// <summary> - /// Create a new GroupCommunicationMessageCodec. - /// </summary> - [Inject] - public GroupCommunicationMessageCodec() - { - } - - /// <summary> - /// Serialize the GroupCommunicationObject into a byte array using Protobuf. - /// </summary> - /// <param name="obj">The object to serialize.</param> - /// <returns>The serialized GroupCommunicationMessage in byte array form</returns> - public byte[] Encode(GroupCommunicationMessage obj) - { - GcmMessageProto proto = GcmMessageProto.Create(obj); - using (var stream = new MemoryStream()) - { - Serializer.Serialize(stream, proto); - return stream.ToArray(); - } - } - - /// <summary> - /// Deserialize the byte array into a GroupCommunicationMessage using Protobuf. - /// </summary> - /// <param name="data">The byte array to deserialize</param> - /// <returns>The deserialized GroupCommunicationMessage object.</returns> - public GroupCommunicationMessage Decode(byte[] data) - { - using (var stream = new MemoryStream(data)) - { - GcmMessageProto proto = Serializer.Deserialize<GcmMessageProto>(stream); - return proto.ToGcm(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs b/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs new file mode 100644 index 0000000..a270272 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Config/CodecToStreamingCodecConfiguration.cs @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using Org.Apache.REEF.Network.Group.Pipelining; +using Org.Apache.REEF.Network.StreamingCodec; +using Org.Apache.REEF.Tang.Formats; +using Org.Apache.REEF.Tang.Util; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.Group.Config +{ + public sealed class CodecToStreamingCodecConfiguration<T> : ConfigurationModuleBuilder + { + /// <summary> + /// RequiredImpl for Codec. Client needs to set implementation for this paramter + /// </summary> + public static readonly RequiredImpl<ICodec<T>> Codec = new RequiredImpl<ICodec<T>>(); + + /// <summary> + /// Configuration Module for Codec + /// </summary> + public static ConfigurationModule Conf = new CodecToStreamingCodecConfiguration<T>() + .BindImplementation(GenericType<ICodec<T>>.Class, Codec) + .BindImplementation(GenericType<IStreamingCodec<T>>.Class, GenericType<CodecToStreamingCodec<T>>.Class) + .BindImplementation(GenericType<IStreamingCodec<PipelineMessage<T>>>.Class, GenericType<StreamingPipelineMessageCodec<T>>.Class) + .Build(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs index 3cc7270..240d244 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/ICommunicationGroupDriver.cs @@ -17,14 +17,9 @@ * under the License. */ -using System; using System.Collections.Generic; -using Org.Apache.REEF.Network.Group.Operators; -using Org.Apache.REEF.Network.Group.Operators.Impl; -using Org.Apache.REEF.Network.Group.Pipelining; using Org.Apache.REEF.Network.Group.Topology; using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Wake.Remote; namespace Org.Apache.REEF.Network.Group.Driver { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs index 07581ba..5ebb357 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/CommunicationGroupDriver.cs @@ -17,23 +17,20 @@ * under the License. */ +using System; using System.Collections.Generic; using System.Reflection; -using System.Threading; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Operators.Impl; -using Org.Apache.REEF.Network.Group.Pipelining; using Org.Apache.REEF.Network.Group.Pipelining.Impl; using Org.Apache.REEF.Network.Group.Topology; using Org.Apache.REEF.Tang.Exceptions; using Org.Apache.REEF.Tang.Formats; -using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake.Remote; -using Org.Apache.REEF.Wake.Remote.Impl; + namespace Org.Apache.REEF.Network.Group.Driver.Impl { @@ -42,7 +39,8 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl /// All operators in the same Communication Group run on the the /// same set of tasks. /// </summary> - public class CommunicationGroupDriver : ICommunicationGroupDriver + // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295.] + public sealed class CommunicationGroupDriver : ICommunicationGroupDriver { private static readonly Logger LOGGER = Logger.GetLogger(typeof (CommunicationGroupDriver)); @@ -141,7 +139,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl /// <returns>The same CommunicationGroupDriver with the added Broadcast operator info</returns> public ICommunicationGroupDriver AddBroadcast(string operatorName, string masterTaskId, TopologyTypes topologyType = TopologyTypes.Flat) { - return AddBroadcast<int>( operatorName, masterTaskId, topologyType, GetDefaulConfiguration()); + return AddBroadcast<int>( operatorName, masterTaskId, topologyType, GetDefaultConfiguration()); } /// <summary> @@ -236,7 +234,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl public ICommunicationGroupDriver AddScatter(string operatorName, string senderId, TopologyTypes topologyType = TopologyTypes.Flat) { - return AddScatter<int>(operatorName, senderId, topologyType, GetDefaulConfiguration()); + return AddScatter<int>(operatorName, senderId, topologyType, GetDefaultConfiguration()); } /// <summary> @@ -343,18 +341,14 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl return (IConfiguration) info.Invoke(topology, new[] {(object) taskId}); } - private IConfiguration[] GetDefaulConfiguration() + private IConfiguration[] GetDefaultConfiguration() { List<IConfiguration> list = new List<IConfiguration>(); - IConfiguration codecConfig = CodecConfiguration<int>.Conf - .Set(CodecConfiguration<int>.Codec, GenericType<IntCodec>.Class) - .Build(); IConfiguration dataConverterConfig = PipelineDataConverterConfiguration<int>.Conf .Set(PipelineDataConverterConfiguration<int>.DataConverter, GenericType<DefaultPipelineDataConverter<int>>.Class) .Build(); - list.Add(codecConfig); list.Add(dataConverterConfig); return list.ToArray(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs new file mode 100644 index 0000000..e807a4e --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GeneralGroupCommunicationMessage.cs @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System; +using System.Threading; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Tang.Annotations; + +namespace Org.Apache.REEF.Network.Group.Driver.Impl +{ + /// <summary> + /// Messages sent by MPI Operators. This is the abstract class inherited by + /// WritableGroupCommunicationMessage but seen by Network Service + /// </summary> + // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295. + public abstract class GeneralGroupCommunicationMessage : IWritable + { + /// <summary> + /// Empty constructor to allow instantiation by reflection + /// </summary> + protected GeneralGroupCommunicationMessage() + { + } + + /// <summary> + /// Create new CommunicationGroupMessage. + /// </summary> + /// <param name="groupName">The name of the communication group</param> + /// <param name="operatorName">The name of the MPI operator</param> + /// <param name="source">The message source</param> + /// <param name="destination">The message destination</param> + /// <param name="messageType">The type of message to send</param> + protected GeneralGroupCommunicationMessage( + string groupName, + string operatorName, + string source, + string destination, + MessageType messageType) + { + GroupName = groupName; + OperatorName = operatorName; + Source = source; + Destination = destination; + MsgType = messageType; + } + + /// <summary> + /// Returns the Communication Group name. + /// </summary> + public string GroupName { get; internal set; } + + /// <summary> + /// Returns the MPI Operator name. + /// </summary> + public string OperatorName { get; internal set; } + + /// <summary> + /// Returns the source of the message. + /// </summary> + public string Source { get; internal set; } + + /// <summary> + /// Returns the destination of the message. + /// </summary> + public string Destination { get; internal set; } + + /// <summary> + /// Returns the type of message being sent. + /// </summary> + public MessageType MsgType { get; internal set; } + + /// <summary> + /// Read the class fields. + /// </summary> + /// <param name="reader">The reader from which to read </param> + public abstract void Read(IDataReader reader); + + /// <summary> + /// Writes the class fields. + /// </summary> + /// <param name="writer">The writer to which to write</param> + public abstract void Write(IDataWriter writer); + + /// <summary> + /// Read the class fields. + /// </summary> + /// <param name="reader">The reader from which to read </param> + /// <param name="token">The cancellation token</param> + public abstract System.Threading.Tasks.Task ReadAsync(IDataReader reader, CancellationToken token); + + /// <summary> + /// Writes the class fields. + /// </summary> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">The cancellation token</param> + public abstract System.Threading.Tasks.Task WriteAsync(IDataWriter writer, CancellationToken token); + } +} \ No newline at end of file
