Repository: incubator-reef Updated Branches: refs/heads/master b3c1517bf -> 1d2003346
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs index 3ccf31f..ade5834 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommDriver.cs @@ -25,19 +25,16 @@ using System.Threading; using Org.Apache.REEF.Common.Io; using Org.Apache.REEF.Common.Services; using Org.Apache.REEF.Driver.Context; -using Org.Apache.REEF.Network.Group.Codec; using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Task.Impl; using Org.Apache.REEF.Network.Naming; using Org.Apache.REEF.Network.NetworkService; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Formats; -using Org.Apache.REEF.Tang.Implementations.Configuration; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake.Remote; namespace Org.Apache.REEF.Network.Group.Driver.Impl { @@ -45,7 +42,8 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl /// Used to create Communication Groups for Group Communication Operators on the Reef driver. /// Also manages configuration for Group Communication tasks/services. /// </summary> - public class GroupCommDriver : IGroupCommDriver + // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295. + public sealed class GroupCommDriver : IGroupCommDriver { private const string MasterTaskContextName = "MasterTaskContext"; private const string SlaveTaskContextName = "SlaveTaskContext"; @@ -53,17 +51,16 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl private static Logger LOGGER = Logger.GetLogger(typeof(GroupCommDriver)); private readonly string _driverId; - private readonly string _nameServerAddr; + private readonly string _nameServerAddr; private readonly int _nameServerPort; private int _contextIds; - private int _fanOut; - private string _groupName; + private readonly int _fanOut; + private readonly string _groupName; - private readonly Dictionary<string, ICommunicationGroupDriver> _commGroups; + private readonly Dictionary<string, ICommunicationGroupDriver> _commGroups; private readonly AvroConfigurationSerializer _configSerializer; - private readonly INameServer _nameServer; - + /// <summary> /// Create a new GroupCommunicationDriver object. /// </summary> @@ -75,7 +72,7 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl /// <param name="configSerializer">Used to serialize task configuration</param> /// <param name="nameServer">Used to map names to ip adresses</param> [Inject] - public GroupCommDriver( + private GroupCommDriver( [Parameter(typeof(GroupCommConfigurationOptions.DriverId))] string driverId, [Parameter(typeof(GroupCommConfigurationOptions.MasterTaskId))] string masterTaskId, [Parameter(typeof(GroupCommConfigurationOptions.FanOut))] int fanOut, @@ -92,9 +89,8 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl _configSerializer = configSerializer; _commGroups = new Dictionary<string, ICommunicationGroupDriver>(); - _nameServer = nameServer; - IPEndPoint localEndpoint = _nameServer.LocalEndpoint; + IPEndPoint localEndpoint = nameServer.LocalEndpoint; _nameServerAddr = localEndpoint.Address.ToString(); _nameServerPort = localEndpoint.Port; @@ -123,11 +119,13 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl { throw new ArgumentNullException("groupName"); } - else if (numTasks < 1) + + if (numTasks < 1) { - throw new ArgumentException("NumTasks must be greater than 0"); + throw new ArgumentException("NumTasks must be greater than 0"); } - else if (_commGroups.ContainsKey(groupName)) + + if (_commGroups.ContainsKey(groupName)) { throw new ArgumentException("Group Name already registered with GroupCommunicationDriver"); } @@ -144,8 +142,8 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl public IConfiguration GetContextConfiguration() { int contextNum = Interlocked.Increment(ref _contextIds); - string id = (contextNum == 0) - ? MasterTaskContextName + string id = (contextNum == 0) + ? MasterTaskContextName : GetSlaveTaskContextName(contextNum); return ContextConfiguration.ConfigurationModule @@ -160,16 +158,13 @@ namespace Org.Apache.REEF.Network.Group.Driver.Impl public IConfiguration GetServiceConfiguration() { IConfiguration serviceConfig = ServiceConfiguration.ConfigurationModule - .Set(ServiceConfiguration.Services, GenericType<NetworkService<GroupCommunicationMessage>>.Class) + .Set(ServiceConfiguration.Services, GenericType<WritableNetworkService<GeneralGroupCommunicationMessage>>.Class) .Build(); return TangFactory.GetTang().NewConfigurationBuilder(serviceConfig) .BindImplementation( - GenericType<IObserver<NsMessage<GroupCommunicationMessage>>>.Class, + GenericType<IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>>>.Class, GenericType<GroupCommNetworkObserver>.Class) - .BindImplementation( - GenericType<ICodec<GroupCommunicationMessage>>.Class, - GenericType<GroupCommunicationMessageCodec>.Class) .BindNamedParameter<NamingConfigurationOptions.NameServerAddress, string>( GenericType<NamingConfigurationOptions.NameServerAddress>.Class, _nameServerAddr) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs index dd67f8c..33f9c92 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Driver/Impl/GroupCommunicationMessage.cs @@ -17,91 +17,191 @@ * under the License. */ +using System; +using System.Threading; +using Org.Apache.REEF.Network.StreamingCodec; +using Org.Apache.REEF.Wake.Remote; +using Org.Apache.REEF.Tang.Annotations; + namespace Org.Apache.REEF.Network.Group.Driver.Impl { /// <summary> - /// Messages sent by Group Communication Operators + /// Messages sent by MPI Operators. This is the Writable version of GroupCommunicationMessage + /// class and will eventually replace it once everybody agrees with the design /// </summary> - public class GroupCommunicationMessage + // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295. + public sealed class GroupCommunicationMessage<T> : GeneralGroupCommunicationMessage { + private readonly IStreamingCodec<T> _codec; + + /// <summary> + /// Empty constructor to allow instantiation by reflection + /// </summary> + [Inject] + private GroupCommunicationMessage(IStreamingCodec<T> codec) + { + _codec = codec; + } + /// <summary> /// Create new CommunicationGroupMessage. /// </summary> /// <param name="groupName">The name of the communication group</param> - /// <param name="operatorName">The name of the Group Communication operator</param> + /// <param name="operatorName">The name of the MPI operator</param> /// <param name="source">The message source</param> /// <param name="destination">The message destination</param> - /// <param name="data">The actual byte array of data</param> + /// <param name="message">The actual Writable message</param> /// <param name="messageType">The type of message to send</param> + /// <param name="codec">Streaming Codec</param> public GroupCommunicationMessage( string groupName, string operatorName, string source, string destination, - byte[] data, - MessageType messageType) + T message, + MessageType messageType, + IStreamingCodec<T> codec) + : base(groupName, operatorName, source, destination, messageType) { - GroupName = groupName; - OperatorName = operatorName; - Source = source; - Destination = destination; - Data = new[] { data }; - MsgType = messageType; + _codec = codec; + Data = new T[] { message }; } /// <summary> /// Create new CommunicationGroupMessage. /// </summary> /// <param name="groupName">The name of the communication group</param> - /// <param name="operatorName">The name of the Group Communication operator</param> + /// <param name="operatorName">The name of the MPI operator</param> /// <param name="source">The message source</param> /// <param name="destination">The message destination</param> - /// <param name="data">The actual byte array of data</param> + /// <param name="message">The actual Writable message array</param> /// <param name="messageType">The type of message to send</param> + /// <param name="codec">Streaming Codec</param> public GroupCommunicationMessage( string groupName, string operatorName, string source, string destination, - byte[][] data, - MessageType messageType) + T[] message, + MessageType messageType, + IStreamingCodec<T> codec) + : base(groupName, operatorName, source, destination, messageType) { - GroupName = groupName; - OperatorName = operatorName; - Source = source; - Destination = destination; - Data = data; - MsgType = messageType; + _codec = codec; + Data = message; } /// <summary> - /// Returns the Communication Group name. + /// Returns the array of messages. /// </summary> - public string GroupName { get; private set; } + public T[] Data + { + get; + set; + } /// <summary> - /// Returns the Group Communication Operator name. + /// Read the class fields. /// </summary> - public string OperatorName { get; private set; } + /// <param name="reader">The reader from which to read </param> + public override void Read(IDataReader reader) + { + GroupName = reader.ReadString(); + OperatorName = reader.ReadString(); + Source = reader.ReadString(); + Destination = reader.ReadString(); - /// <summary> - /// Returns the source of the message. - /// </summary> - public string Source { get; private set; } + int dataCount = reader.ReadInt32(); + + if (dataCount == 0) + { + throw new Exception("Data Count in Group COmmunication Message cannot be zero"); + } + + MsgType = (MessageType)Enum.Parse(typeof(MessageType), reader.ReadString()); + Data = new T[dataCount]; + + for (int index = 0; index < dataCount; index++) + { + Data[index] = _codec.Read(reader); + + if (Data[index] == null) + { + throw new Exception("message instance cannot be created from the IDataReader in Group Communication Message"); + } + } + } /// <summary> - /// Returns the destination of the message. + /// Writes the class fields. /// </summary> - public string Destination { get; private set; } + /// <param name="writer">The writer to which to write</param> + public override void Write(IDataWriter writer) + { + writer.WriteString(GroupName); + writer.WriteString(OperatorName); + writer.WriteString(Source); + writer.WriteString(Destination); + writer.WriteInt32(Data.Length); + writer.WriteString(MsgType.ToString()); + + foreach (var data in Data) + { + _codec.Write(data, writer); + } + } /// <summary> - /// Returns the message data. + /// Read the class fields. /// </summary> - public byte[][] Data { get; private set; } + /// <param name="reader">The reader from which to read </param> + /// <param name="token">The cancellation token</param> + public override async System.Threading.Tasks.Task ReadAsync(IDataReader reader, CancellationToken token) + { + GroupName = await reader.ReadStringAsync(token); + OperatorName = await reader.ReadStringAsync(token); + Source = await reader.ReadStringAsync(token); + Destination = await reader.ReadStringAsync(token); + + int dataCount = await reader.ReadInt32Async(token); + + if (dataCount == 0) + { + throw new Exception("Data Count in Group COmmunication Message cannot be zero"); + } + + MsgType = (MessageType)Enum.Parse(typeof(MessageType), await reader.ReadStringAsync(token)); + Data = new T[dataCount]; + + for (int index = 0; index < dataCount; index++) + { + Data[index] = await _codec.ReadAsync(reader, token); + + if (Data[index] == null) + { + throw new Exception("message instance cannot be created from the IDataReader in Group Communication Message"); + } + } + } /// <summary> - /// Returns the type of message being sent. + /// Writes the class fields. /// </summary> - public MessageType MsgType { get; private set; } + /// <param name="writer">The writer to which to write</param> + /// <param name="token">The cancellation token</param> + public override async System.Threading.Tasks.Task WriteAsync(IDataWriter writer, CancellationToken token) + { + await writer.WriteStringAsync(GroupName, token); + await writer.WriteStringAsync(OperatorName, token); + await writer.WriteStringAsync(Source, token); + await writer.WriteStringAsync(Destination, token); + await writer.WriteInt32Async(Data.Length, token); + await writer.WriteStringAsync(MsgType.ToString(), token); + + foreach (var data in Data) + { + await _codec.WriteAsync(data, writer, token); + } + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs index b53929e..e7fcb52 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastReceiver.cs @@ -33,7 +33,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// Group Communication Operator used to receive broadcast messages in pipelined fashion. /// </summary> /// <typeparam name="T">The type of message being sent.</typeparam> - public class BroadcastReceiver<T> : IBroadcastReceiver<T> + public sealed class BroadcastReceiver<T> : IBroadcastReceiver<T> { private const int PipelineVersion = 2; private readonly IOperatorTopology<PipelineMessage<T>> _topology; @@ -64,7 +64,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl PipelineDataConverter = dataConverter; _topology = topology; - var msgHandler = Observer.Create<GroupCommunicationMessage>(message => topology.OnNext(message)); + var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message)); networkHandler.Register(operatorName, msgHandler); if (initialize) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs index 4af01c7..5457a70 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/BroadcastSender.cs @@ -33,7 +33,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// Group Communication Operator used to send messages to child Tasks in pipelined fashion. /// </summary> /// <typeparam name="T">The message type</typeparam> - public class BroadcastSender<T> : IBroadcastSender<T> + public sealed class BroadcastSender<T> : IBroadcastSender<T> { private static readonly Logger Logger = Logger.GetLogger(typeof(BroadcastSender<T>)); private const int PipelineVersion = 2; @@ -66,7 +66,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl Version = PipelineVersion; PipelineDataConverter = dataConverter; - var msgHandler = Observer.Create<GroupCommunicationMessage>(message => topology.OnNext(message)); + var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message)); networkHandler.Register(operatorName, msgHandler); if (initialize) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs index bea9c83..187bd58 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceReceiver.cs @@ -33,7 +33,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// Group Communication operator used to receive and reduce messages in pipelined fashion. /// </summary> /// <typeparam name="T">The message type</typeparam> - public class ReduceReceiver<T> : IReduceReceiver<T> + public sealed class ReduceReceiver<T> : IReduceReceiver<T> { private static readonly Logger Logger = Logger.GetLogger(typeof(ReduceReceiver<T>)); private const int PipelineVersion = 2; @@ -71,7 +71,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl _pipelinedReduceFunc = new PipelinedReduceFunction<T>(ReduceFunction); _topology = topology; - var msgHandler = Observer.Create<GroupCommunicationMessage>(message => topology.OnNext(message)); + var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message)); networkHandler.Register(operatorName, msgHandler); if (initialize) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs index 5f289da..0c13d3c 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ReduceSender.cs @@ -34,7 +34,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// Group Communication Operator used to send messages to be reduced by the ReduceReceiver in pipelined fashion. /// </summary> /// <typeparam name="T">The message type</typeparam> - public class ReduceSender<T> : IReduceSender<T> + public sealed class ReduceSender<T> : IReduceSender<T> { private static readonly Logger Logger = Logger.GetLogger(typeof(ReduceSender<T>)); private const int PipelineVersion = 2; @@ -72,7 +72,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl _pipelinedReduceFunc = new PipelinedReduceFunction<T>(ReduceFunction); _topology = topology; - var msgHandler = Observer.Create<GroupCommunicationMessage>(message => topology.OnNext(message)); + var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message)); networkHandler.Register(operatorName, msgHandler); PipelineDataConverter = dataConverter; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs index a9f4b10..a21656d 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterReceiver.cs @@ -33,7 +33,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// from the IScatterSender. /// </summary> /// <typeparam name="T">The message type</typeparam> - public class ScatterReceiver<T> : IScatterReceiver<T> + public sealed class ScatterReceiver<T> : IScatterReceiver<T> { private const int DefaultVersion = 1; private readonly IOperatorTopology<T> _topology; @@ -60,7 +60,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl Version = DefaultVersion; _topology = topology; - var msgHandler = Observer.Create<GroupCommunicationMessage>(message => topology.OnNext(message)); + var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message)); networkHandler.Register(operatorName, msgHandler); if (initialize) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs index 80bc84e..42e7f6b 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/ScatterSender.cs @@ -33,7 +33,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// of the IScatterReceivers. /// </summary> /// <typeparam name="T">The message type</typeparam> - public class ScatterSender<T> : IScatterSender<T> + public sealed class ScatterSender<T> : IScatterSender<T> { private const int DefaultVersion = 1; private readonly IOperatorTopology<T> _topology; @@ -60,7 +60,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl Version = DefaultVersion; _topology = topology; - var msgHandler = Observer.Create<GroupCommunicationMessage>(message => topology.OnNext(message)); + var msgHandler = Observer.Create<GeneralGroupCommunicationMessage>(message => topology.OnNext(message)); networkHandler.Register(operatorName, msgHandler); if (initialize) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs index 7b37c07..a78b13c 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Operators/Impl/Sender.cs @@ -27,10 +27,12 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl { /// <summary> /// Group Communication operator used to do point-to-point communication between named Tasks. + /// It uses Writable classes /// </summary> - public class Sender + [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] + public sealed class Sender { - private readonly INetworkService<GroupCommunicationMessage> _networkService; + private readonly INetworkService<GeneralGroupCommunicationMessage> _networkService; private readonly IIdentifierFactory _idFactory; /// <summary> @@ -39,8 +41,8 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// <param name="networkService">The network services used to send messages.</param> /// <param name="idFactory">Used to create IIdentifier for GroupCommunicationMessages.</param> [Inject] - public Sender( - NetworkService<GroupCommunicationMessage> networkService, + private Sender( + WritableNetworkService<GeneralGroupCommunicationMessage> networkService, IIdentifierFactory idFactory) { _networkService = networkService; @@ -52,7 +54,7 @@ namespace Org.Apache.REEF.Network.Group.Operators.Impl /// included in the message. /// </summary> /// <param name="message">The message to send.</param> - public void Send(GroupCommunicationMessage message) + public void Send(GeneralGroupCommunicationMessage message) { if (message == null) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs index e4f5b40..96c36e1 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/ICommunicationGroupNetworkObserver.cs @@ -26,12 +26,14 @@ namespace Org.Apache.REEF.Network.Group.Task { /// <summary> /// Handles incoming messages sent to this Communication Group. + /// Writable Version /// </summary> [DefaultImplementation(typeof(CommunicationGroupNetworkObserver))] - public interface ICommunicationGroupNetworkObserver : IObserver<GroupCommunicationMessage> + // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295. + public interface ICommunicationGroupNetworkObserver : IObserver<GeneralGroupCommunicationMessage> { /// <summary> - /// Registers the handler with the CommunicationGroupNetworkObserver. + /// Registers the handler with the WritableCommunicationGroupNetworkObserver. /// Messages that are to be sent to the operator specified by operatorName /// are handled by the given observer. /// </summary> @@ -39,6 +41,6 @@ namespace Org.Apache.REEF.Network.Group.Task /// will be invoked</param> /// <param name="observer">The handler to invoke when messages are sent /// to the operator specified by operatorName</param> - void Register(string operatorName, IObserver<GroupCommunicationMessage> observer); + void Register(string operatorName, IObserver<GeneralGroupCommunicationMessage> observer); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs index 8416cca..de19754 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/IGroupCommNetworkObserver.cs @@ -27,9 +27,11 @@ namespace Org.Apache.REEF.Network.Group.Task { /// <summary> /// Handles all incoming messages for this Task. + /// Writable Version /// </summary> [DefaultImplementation(typeof(GroupCommNetworkObserver))] - public interface IGroupCommNetworkObserver : IObserver<NsMessage<GroupCommunicationMessage>> + // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295. + public interface IGroupCommNetworkObserver : IObserver<WritableNsMessage<GeneralGroupCommunicationMessage>> { /// <summary> /// Registers the network handler for the given CommunicationGroup. @@ -39,6 +41,6 @@ namespace Org.Apache.REEF.Network.Group.Task /// <param name="groupName">The group name for the network handler</param> /// <param name="commGroupHandler">The network handler to invoke when /// messages are sent to the given group.</param> - void Register(string groupName, IObserver<GroupCommunicationMessage> commGroupHandler); + void Register(string groupName, IObserver<GeneralGroupCommunicationMessage> commGroupHandler); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs index 30dddcf..5be1457 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/CommunicationGroupNetworkObserver.cs @@ -19,8 +19,6 @@ using System; using System.Collections.Generic; -using System.Threading; -using Org.Apache.REEF.Network.Group.Config; using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Utilities.Diagnostics; @@ -30,19 +28,21 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl { /// <summary> /// Handles incoming messages sent to this Communication Group. + /// Writable version /// </summary> - public class CommunicationGroupNetworkObserver : ICommunicationGroupNetworkObserver + // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295. + public sealed class CommunicationGroupNetworkObserver : ICommunicationGroupNetworkObserver { private static readonly Logger LOGGER = Logger.GetLogger(typeof(CommunicationGroupNetworkObserver)); - private readonly Dictionary<string, IObserver<GroupCommunicationMessage>> _handlers; + private readonly Dictionary<string, IObserver<GeneralGroupCommunicationMessage>> _handlers; /// <summary> /// Creates a new CommunicationGroupNetworkObserver. /// </summary> [Inject] - public CommunicationGroupNetworkObserver() + private CommunicationGroupNetworkObserver() { - _handlers = new Dictionary<string, IObserver<GroupCommunicationMessage>>(); + _handlers = new Dictionary<string, IObserver<GeneralGroupCommunicationMessage>>(); } /// <summary> @@ -52,9 +52,9 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// </summary> /// <param name="operatorName">The name of the operator whose handler /// will be invoked</param> - /// <param name="observer">The handler to invoke when messages are sent + /// <param name="observer">The writable handler to invoke when messages are sent /// to the operator specified by operatorName</param> - public void Register(string operatorName, IObserver<GroupCommunicationMessage> observer) + public void Register(string operatorName, IObserver<GeneralGroupCommunicationMessage> observer) { if (string.IsNullOrEmpty(operatorName)) { @@ -69,16 +69,15 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl } /// <summary> - /// Handles the incoming GroupCommunicationMessage sent to this Communication Group. + /// Handles the incoming GeneralGroupCommunicationMessage sent to this Communication Group. /// Looks for the operator that the message is being sent to and invoke its handler. /// </summary> /// <param name="message">The incoming message</param> - public void OnNext(GroupCommunicationMessage message) + public void OnNext(GeneralGroupCommunicationMessage message) { string operatorName = message.OperatorName; - IObserver<GroupCommunicationMessage> handler = GetOperatorHandler(operatorName); - + IObserver<GeneralGroupCommunicationMessage> handler = GetOperatorHandler(operatorName); if (handler == null) { Exceptions.Throw(new ArgumentException("No handler registered with the operator name: " + operatorName), LOGGER); @@ -94,14 +93,15 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// </summary> /// <param name="operatorName"></param> /// <returns></returns> - private IObserver<GroupCommunicationMessage> GetOperatorHandler(string operatorName) + private IObserver<GeneralGroupCommunicationMessage> GetOperatorHandler(string operatorName) { - IObserver<GroupCommunicationMessage> handler; + IObserver<GeneralGroupCommunicationMessage> handler; if (!_handlers.TryGetValue(operatorName, out handler)) { Exceptions.Throw(new ApplicationException("No handler registered yet with the operator name: " + operatorName), LOGGER); } return handler; + } public void OnError(Exception error) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs index 4cf0e06..8a8e696 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommClient.cs @@ -31,28 +31,30 @@ using Org.Apache.REEF.Wake.Remote.Impl; namespace Org.Apache.REEF.Network.Group.Task.Impl { /// <summary> - /// Container of ommunicationGroupClients + /// Used by Tasks to fetch CommunicationGroupClients. + /// Writable version /// </summary> - public class GroupCommClient : IGroupCommClient + // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295. + public sealed class GroupCommClient : IGroupCommClient { private readonly Dictionary<string, ICommunicationGroupClient> _commGroups; - private readonly INetworkService<GroupCommunicationMessage> _networkService; + private readonly INetworkService<GeneralGroupCommunicationMessage> _networkService; /// <summary> - /// Creates a new GroupCommClient and registers the task ID with the Name Server. - /// Currently the GroupCommClient is injected in task constructor. When work with REEF-289, we should put the injection at a proepr palce. + /// Creates a new WritableGroupCommClient and registers the task ID with the Name Server. /// </summary> /// <param name="groupConfigs">The set of serialized Group Communication configurations</param> - /// <param name="taskId">The identifier for this taskfor this task</param> - /// <param name="networkService">The network service used to send messages</param> + /// <param name="taskId">The identifier for this task</param> + /// <param name="networkService">The writable network service used to send messages</param> /// <param name="configSerializer">Used to deserialize Group Communication configuration</param> /// <param name="injector">injector forked from the injector that creates this instance</param> [Inject] - private GroupCommClient( + [Obsolete("Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295 ", false)] + public GroupCommClient( [Parameter(typeof(GroupCommConfigurationOptions.SerializedGroupConfigs))] ISet<string> groupConfigs, [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId, - NetworkService<GroupCommunicationMessage> networkService, + WritableNetworkService<GeneralGroupCommunicationMessage> networkService, AvroConfigurationSerializer configSerializer, IInjector injector) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs index 5604885..9d35ff1 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/GroupCommNetworkObserver.cs @@ -29,29 +29,31 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl { /// <summary> /// Handles all incoming messages for this Task. + /// Writable version /// </summary> - public class GroupCommNetworkObserver : IGroupCommNetworkObserver + // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295. + public sealed class GroupCommNetworkObserver : IGroupCommNetworkObserver { private static readonly Logger LOGGER = Logger.GetLogger(typeof(GroupCommNetworkObserver)); - private readonly Dictionary<string, IObserver<GroupCommunicationMessage>> _commGroupHandlers; - + private readonly Dictionary<string, IObserver<GeneralGroupCommunicationMessage>> _commGroupHandlers; + /// <summary> /// Creates a new GroupCommNetworkObserver. /// </summary> [Inject] public GroupCommNetworkObserver() { - _commGroupHandlers = new Dictionary<string, IObserver<GroupCommunicationMessage>>(); + _commGroupHandlers = new Dictionary<string, IObserver<GeneralGroupCommunicationMessage>>(); } /// <summary> - /// Handles the incoming NsMessage for this Task. - /// Delegates the GroupCommunicationMessage to the correct - /// CommunicationGroupNetworkObserver. + /// Handles the incoming WritableNsMessage for this Task. + /// Delegates the GeneralGroupCommunicationMessage to the correct + /// WritableCommunicationGroupNetworkObserver. /// </summary> /// <param name="nsMessage"></param> - public void OnNext(NsMessage<GroupCommunicationMessage> nsMessage) + public void OnNext(WritableNsMessage<GeneralGroupCommunicationMessage> nsMessage) { if (nsMessage == null) { @@ -60,7 +62,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl try { - GroupCommunicationMessage gcm = nsMessage.Data.First(); + GeneralGroupCommunicationMessage gcm = nsMessage.Data.First(); _commGroupHandlers[gcm.GroupName].OnNext(gcm); } catch (InvalidOperationException) @@ -83,7 +85,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// <param name="groupName">The group name for the network handler</param> /// <param name="commGroupHandler">The network handler to invoke when /// messages are sent to the given group.</param> - public void Register(string groupName, IObserver<GroupCommunicationMessage> commGroupHandler) + public void Register(string groupName, IObserver<GeneralGroupCommunicationMessage> commGroupHandler) { if (string.IsNullOrEmpty(groupName)) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs index 013f76b..aa5de1e 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/NodeStruct.cs @@ -17,17 +17,22 @@ * under the License. */ +using System; using System.Collections.Concurrent; using Org.Apache.REEF.Network.Group.Driver.Impl; namespace Org.Apache.REEF.Network.Group.Task.Impl { + /// <summary> /// Stores all incoming messages sent by a particular Task. + /// Writable version /// </summary> - internal class NodeStruct + /// <typeparam name="T"> Generic type of message</typeparam> + // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295. + internal sealed class NodeStruct<T> { - private readonly BlockingCollection<GroupCommunicationMessage> _messageQueue; + private readonly BlockingCollection<GroupCommunicationMessage<T>> _messageQueue; /// <summary> /// Creates a new NodeStruct. @@ -36,7 +41,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl public NodeStruct(string id) { Identifier = id; - _messageQueue = new BlockingCollection<GroupCommunicationMessage>(); + _messageQueue = new BlockingCollection<GroupCommunicationMessage<T>>(); } /// <summary> @@ -49,7 +54,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// Gets the first message in the message queue. /// </summary> /// <returns>The first available message.</returns> - public byte[][] GetData() + public T[] GetData() { return _messageQueue.Take().Data; } @@ -58,7 +63,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// Adds an incoming message to the message queue. /// </summary> /// <param name="gcm">The incoming message</param> - public void AddData(GroupCommunicationMessage gcm) + public void AddData(GroupCommunicationMessage<T> gcm) { _messageQueue.Add(gcm); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs index dd11402..67e1da9 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Task/Impl/OperatorTopology.cs @@ -18,7 +18,6 @@ */ using System; -using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; @@ -30,20 +29,22 @@ using Org.Apache.REEF.Network.Group.Driver.Impl; using Org.Apache.REEF.Network.Group.Operators; using Org.Apache.REEF.Network.Group.Operators.Impl; using Org.Apache.REEF.Network.NetworkService; +using Org.Apache.REEF.Network.StreamingCodec; using Org.Apache.REEF.Tang.Annotations; using Org.Apache.REEF.Tang.Exceptions; using Org.Apache.REEF.Utilities.Logging; -using Org.Apache.REEF.Wake.Remote; namespace Org.Apache.REEF.Network.Group.Task.Impl { /// <summary> /// Contains the Operator's topology graph. + /// Writable version /// Used to send or receive messages to/from operators in the same /// Communication Group. /// </summary> /// <typeparam name="T">The message type</typeparam> - public class OperatorTopology<T> : IOperatorTopology<T>, IObserver<GroupCommunicationMessage> + // TODO: Need to remove Iwritable and use IstreamingCodec. Please see Jira REEF-295. + public sealed class OperatorTopology<T> : IOperatorTopology<T>, IObserver<GeneralGroupCommunicationMessage> { private const int DefaultTimeout = 50000; private const int RetryCount = 10; @@ -57,14 +58,14 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl private readonly int _timeout; private readonly int _retryCount; - private readonly NodeStruct _parent; - private readonly List<NodeStruct> _children; - private readonly Dictionary<string, NodeStruct> _idToNodeMap; - private readonly ICodec<T> _codec; + private readonly NodeStruct<T> _parent; + private readonly List<NodeStruct<T>> _children; + private readonly Dictionary<string, NodeStruct<T>> _idToNodeMap; private readonly INameClient _nameClient; private readonly Sender _sender; - private readonly BlockingCollection<NodeStruct> _nodesWithData; + private readonly BlockingCollection<NodeStruct<T>> _nodesWithData; private readonly Object _thisLock = new Object(); + private readonly IStreamingCodec<T> _codec; /// <summary> /// Creates a new OperatorTopology object. @@ -73,37 +74,39 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// <param name="groupName">The name of the operator's Communication Group</param> /// <param name="taskId">The operator's Task identifier</param> /// <param name="driverId">The identifer for the driver</param> + /// <param name="timeout">Timeout value for cancellation token</param> + /// <param name="retryCount">Number of times to retry registration</param> /// <param name="rootId">The identifier for the root Task in the topology graph</param> /// <param name="childIds">The set of child Task identifiers in the topology graph</param> /// <param name="networkService">The network service</param> - /// <param name="codec">The codec used to serialize and deserialize messages</param> /// <param name="sender">The Sender used to do point to point communication</param> + /// <param name="codec">Streaming codec to encode objects</param> [Inject] - public OperatorTopology( + private OperatorTopology( [Parameter(typeof(GroupCommConfigurationOptions.OperatorName))] string operatorName, [Parameter(typeof(GroupCommConfigurationOptions.CommunicationGroupName))] string groupName, [Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId, [Parameter(typeof(GroupCommConfigurationOptions.DriverId))] string driverId, - [Parameter(typeof(GroupCommConfigurationOptions.Timeout))] int timrout, + [Parameter(typeof(GroupCommConfigurationOptions.Timeout))] int timeout, [Parameter(typeof(GroupCommConfigurationOptions.RetryCount))] int retryCount, [Parameter(typeof(GroupCommConfigurationOptions.TopologyRootTaskId))] string rootId, [Parameter(typeof(GroupCommConfigurationOptions.TopologyChildTaskIds))] ISet<string> childIds, - NetworkService<GroupCommunicationMessage> networkService, - ICodec<T> codec, - Sender sender) + WritableNetworkService<GeneralGroupCommunicationMessage> networkService, + Sender sender, + IStreamingCodec<T> codec) { _operatorName = operatorName; _groupName = groupName; _selfId = taskId; _driverId = driverId; - _timeout = timrout; + _timeout = timeout; _retryCount = retryCount; - _codec = codec; _nameClient = networkService.NamingClient; _sender = sender; - _nodesWithData = new BlockingCollection<NodeStruct>(); - _children = new List<NodeStruct>(); - _idToNodeMap = new Dictionary<string, NodeStruct>(); + _nodesWithData = new BlockingCollection<NodeStruct<T>>(); + _children = new List<NodeStruct<T>>(); + _idToNodeMap = new Dictionary<string, NodeStruct<T>>(); + _codec = codec; if (_selfId.Equals(rootId)) { @@ -111,12 +114,12 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl } else { - _parent = new NodeStruct(rootId); + _parent = new NodeStruct<T>(rootId); _idToNodeMap[rootId] = _parent; } foreach (var childId in childIds) { - var node = new NodeStruct(childId); + var node = new NodeStruct<T>(childId); _children.Add(node); _idToNodeMap[childId] = node; } @@ -151,7 +154,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// Updates the sending node's message queue. /// </summary> /// <param name="gcm">The incoming message</param> - public void OnNext(GroupCommunicationMessage gcm) + public void OnNext(GeneralGroupCommunicationMessage gcm) { if (gcm == null) { @@ -171,7 +174,14 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl lock (_thisLock) { _nodesWithData.Add(sourceNode); - sourceNode.AddData(gcm); + var message = gcm as GroupCommunicationMessage<T>; + + if (message == null) + { + throw new NullReferenceException("message passed not of type GroupCommunicationMessage"); + } + + sourceNode.AddData(message); } } @@ -204,7 +214,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl foreach (var child in _children) { - SendToNode(message, MessageType.Data, child); + SendToNode(message, MessageType.Data, child); } } @@ -218,14 +228,14 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl { if (messages == null) { - throw new ArgumentNullException("messages"); + throw new ArgumentNullException("messages"); } if (_children.Count <= 0) { return; } - var count = (int) Math.Ceiling(((double) messages.Count) / _children.Count); + var count = (int)Math.Ceiling(((double)messages.Count) / _children.Count); ScatterHelper(messages, _children, count); } @@ -268,10 +278,10 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl throw new ArgumentException("order cannot be null and must have the same number of elements as child tasks"); } - List<NodeStruct> nodes = new List<NodeStruct>(); + List<NodeStruct<T>> nodes = new List<NodeStruct<T>>(); foreach (string taskId in order) { - NodeStruct node = FindNode(taskId); + NodeStruct<T> node = FindNode(taskId); if (node == null) { throw new IllegalStateException("Received message from invalid task id: " + taskId); @@ -280,7 +290,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl nodes.Add(node); } - int count = (int) Math.Ceiling(((double) messages.Count) / _children.Count); + int count = (int)Math.Ceiling(((double)messages.Count) / _children.Count); ScatterHelper(messages, nodes, count); } @@ -290,28 +300,24 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// <returns>The parent Task's message</returns> public T ReceiveFromParent() { - byte[][] data = ReceiveFromNode(_parent); + T[] data = ReceiveFromNode(_parent); if (data == null || data.Length != 1) { throw new InvalidOperationException("Cannot receive data from parent node"); } - return _codec.Decode(data[0]); + return data[0]; } - /// <summary> - /// Receive a list of incoming messages from the parent Task. - /// </summary> - /// <returns>The parent Task's list of messages</returns> public IList<T> ReceiveListFromParent() { - byte[][] data = ReceiveFromNode(_parent); + T[] data = ReceiveFromNode(_parent); if (data == null || data.Length == 0) { throw new InvalidOperationException("Cannot receive data from parent node"); } - return data.Select(b => _codec.Decode(b)).ToList(); + return data.ToList(); } /// <summary> @@ -336,13 +342,13 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl foreach (var child in childrenWithData) { - byte[][] data = ReceiveFromNode(child); + T[] data = ReceiveFromNode(child); if (data == null || data.Length != 1) { throw new InvalidOperationException("Received invalid data from child with id: " + child.Identifier); } - receivedData.Add(_codec.Decode(data[0])); + receivedData.Add(data[0]); childrenToReceiveFrom.Remove(child.Identifier); } } @@ -368,10 +374,10 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// </summary> ///<param name="nodeSetIdentifier">Candidate set of nodes from which data is to be received</param> /// <returns>A Vector of NodeStruct with incoming data.</returns> - private IEnumerable<NodeStruct> GetNodeWithData(IEnumerable<string> nodeSetIdentifier) + private IEnumerable<NodeStruct<T>> GetNodeWithData(IEnumerable<string> nodeSetIdentifier) { CancellationTokenSource timeoutSource = new CancellationTokenSource(_timeout); - List<NodeStruct> nodesSubsetWithData = new List<NodeStruct>(); + List<NodeStruct<T>> nodesSubsetWithData = new List<NodeStruct<T>>(); try { @@ -408,7 +414,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl potentialNode = _nodesWithData.Take(); } - return new NodeStruct[] { potentialNode }; + return new NodeStruct<T>[] { potentialNode }; } catch (OperationCanceledException) @@ -434,10 +440,10 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// <param name="message">The message to send</param> /// <param name="msgType">The message type</param> /// <param name="node">The NodeStruct representing the Task to send to</param> - private void SendToNode(T message, MessageType msgType, NodeStruct node) + private void SendToNode(T message, MessageType msgType, NodeStruct<T> node) { - GroupCommunicationMessage gcm = new GroupCommunicationMessage(_groupName, _operatorName, - _selfId, node.Identifier, _codec.Encode(message), msgType); + GeneralGroupCommunicationMessage gcm = new GroupCommunicationMessage<T>(_groupName, _operatorName, + _selfId, node.Identifier, message, msgType, _codec); _sender.Send(gcm); } @@ -448,16 +454,40 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl /// <param name="messages">The list of messages to send</param> /// <param name="msgType">The message type</param> /// <param name="node">The NodeStruct representing the Task to send to</param> - private void SendToNode(IList<T> messages, MessageType msgType, NodeStruct node) + private void SendToNode(IList<T> messages, MessageType msgType, NodeStruct<T> node) { - byte[][] encodedMessages = messages.Select(message => _codec.Encode(message)).ToArray(); - GroupCommunicationMessage gcm = new GroupCommunicationMessage(_groupName, _operatorName, - _selfId, node.Identifier, encodedMessages, msgType); + T[] encodedMessages = messages.ToArray(); + + GroupCommunicationMessage<T> gcm = new GroupCommunicationMessage<T>(_groupName, _operatorName, + _selfId, node.Identifier, encodedMessages, msgType, _codec); _sender.Send(gcm); } - private void ScatterHelper(IList<T> messages, List<NodeStruct> order, int count) + /// <summary> + /// Receive a message from the Task represented by the given NodeStruct. + /// Removes the NodeStruct from the nodesWithData queue if requested. + /// </summary> + /// <param name="node">The node to receive from</param> + /// <returns>The byte array message from the node</returns> + private T[] ReceiveFromNode(NodeStruct<T> node) + { + var data = node.GetData(); + return data; + } + + /// <summary> + /// Find the NodeStruct with the given Task identifier. + /// </summary> + /// <param name="identifier">The identifier of the Task</param> + /// <returns>The NodeStruct</returns> + private NodeStruct<T> FindNode(string identifier) + { + NodeStruct<T> node; + return _idToNodeMap.TryGetValue(identifier, out node) ? node : null; + } + + private void ScatterHelper(IList<T> messages, List<NodeStruct<T>> order, int count) { if (count <= 0) { @@ -465,7 +495,7 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl } int i = 0; - foreach (NodeStruct nodeStruct in order) + foreach (var nodeStruct in order) { // The last sublist might be smaller than count if the number of // child tasks is not evenly divisible by count @@ -484,29 +514,6 @@ namespace Org.Apache.REEF.Network.Group.Task.Impl } /// <summary> - /// Receive a message from the Task represented by the given NodeStruct. - /// Removes the NodeStruct from the nodesWithData queue if requested. - /// </summary> - /// <param name="node">The node to receive from</param> - /// <returns>The byte array message from the node</returns> - private byte[][] ReceiveFromNode(NodeStruct node) - { - byte[][] data = node.GetData(); - return data; - } - - /// <summary> - /// Find the NodeStruct with the given Task identifier. - /// </summary> - /// <param name="identifier">The identifier of the Task</param> - /// <returns>The NodeStruct</returns> - private NodeStruct FindNode(string identifier) - { - NodeStruct node; - return _idToNodeMap.TryGetValue(identifier, out node) ? node : null; - } - - /// <summary> /// Checks if the identifier is registered with the Name Server. /// Throws exception if the operation fails more than the retry count. /// </summary> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs index c36f1ca..b182a39 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/FlatTopology.cs @@ -25,8 +25,6 @@ using Org.Apache.REEF.Network.Group.Operators.Impl; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; -using Org.Apache.REEF.Wake.Remote; -using Org.Apache.REEF.Network.Group.Pipelining; using Org.Apache.REEF.Tang.Implementations.Configuration; namespace Org.Apache.REEF.Network.Group.Topology http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs index 0c7f164..0bd46a2 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/ITopology.cs @@ -19,7 +19,6 @@ using Org.Apache.REEF.Network.Group.Operators; using Org.Apache.REEF.Tang.Interface; -using Org.Apache.REEF.Wake.Remote; namespace Org.Apache.REEF.Network.Group.Topology { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs index d6c6bc6..3e76b64 100644 --- a/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs +++ b/lang/cs/Org.Apache.REEF.Network/Group/Topology/TreeTopology.cs @@ -25,18 +25,12 @@ using Org.Apache.REEF.Network.Group.Operators.Impl; using Org.Apache.REEF.Tang.Implementations.Tang; using Org.Apache.REEF.Tang.Interface; using Org.Apache.REEF.Tang.Util; -using Org.Apache.REEF.Wake.Remote; -using Org.Apache.REEF.Network.Group.Pipelining; -using Org.Apache.REEF.Network.Group.Task.Impl; using Org.Apache.REEF.Tang.Implementations.Configuration; -using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Network.Group.Topology { public class TreeTopology<T> : ITopology<T> { - private readonly Logger LOGGER = Logger.GetLogger(typeof(TreeTopology<T>)); - private readonly string _groupName; private readonly string _operatorName; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs index 28cd5f9..a9299bb 100644 --- a/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/WritableNsMessage.cs @@ -101,12 +101,13 @@ namespace Org.Apache.REEF.Network.NetworkService SourceId = _factory.Create(reader.ReadString()); DestId = _factory.Create(reader.ReadString()); int messageCount = reader.ReadInt32(); + string dataType = reader.ReadString(); Data = new List<T>(); for (int index = 0; index < messageCount; index++) { - var dataPoint = (T)_injection.ForkInjector().GetInstance(typeof(T)); + var dataPoint = (T)_injection.ForkInjector().GetInstance(Type.GetType(dataType)); if (null == dataPoint) { @@ -127,6 +128,7 @@ namespace Org.Apache.REEF.Network.NetworkService writer.WriteString(SourceId.ToString()); writer.WriteString(DestId.ToString()); writer.WriteInt32(Data.Count); + writer.WriteString(Data[0].GetType().AssemblyQualifiedName); foreach (var data in Data) { @@ -144,12 +146,13 @@ namespace Org.Apache.REEF.Network.NetworkService SourceId = _factory.Create(await reader.ReadStringAsync(token)); DestId = _factory.Create(await reader.ReadStringAsync(token)); int messageCount = await reader.ReadInt32Async(token); + string dataType = await reader.ReadStringAsync(token); Data = new List<T>(); for (int index = 0; index < messageCount; index++) { - var dataPoint = Activator.CreateInstance<T>(); + var dataPoint = (T) _injection.ForkInjector().GetInstance(Type.GetType(dataType)); if (null == dataPoint) { @@ -171,6 +174,7 @@ namespace Org.Apache.REEF.Network.NetworkService await writer.WriteStringAsync(SourceId.ToString(), token); await writer.WriteStringAsync(DestId.ToString(), token); await writer.WriteInt32Async(Data.Count, token); + await writer.WriteStringAsync(Data[0].GetType().AssemblyQualifiedName, token); foreach (var data in Data) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj index 6a8fa0b..390bbb4 100644 --- a/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj +++ b/lang/cs/Org.Apache.REEF.Network/Org.Apache.REEF.Network.csproj @@ -50,14 +50,15 @@ under the License. <Reference Include="System.Xml" /> </ItemGroup> <ItemGroup> + <Compile Include="Group\Config\CodecToStreamingCodecConfiguration.cs" /> + <Compile Include="Group\Driver\Impl\GeneralGroupCommunicationMessage.cs" /> <Compile Include="StreamingCodec\CommonStreamingCodecs\FloatArrayStreamingCodec.cs" /> <Compile Include="StreamingCodec\CommonStreamingCodecs\DoubleArrayStreamingCodec.cs" /> <Compile Include="StreamingCodec\CommonStreamingCodecs\FloatStreamingCodec.cs" /> <Compile Include="StreamingCodec\CommonStreamingCodecs\IntArrayStreamingCodec.cs" /> <Compile Include="StreamingCodec\CommonStreamingCodecs\DoubleStreamingCodec.cs" /> + <Compile Include="StreamingCodec\CommonStreamingCodecs\StringStreamingCodec.cs" /> <Compile Include="StreamingCodec\CommonStreamingCodecs\IntStreamingCodec.cs" /> - <Compile Include="Group\Codec\GcmMessageProto.cs" /> - <Compile Include="Group\Codec\GroupCommunicationMessageCodec.cs" /> <Compile Include="Group\Config\CodecConfiguration.cs" /> <Compile Include="Group\Config\GroupCommConfigurationOptions.cs" /> <Compile Include="Group\Config\PipelineDataConverterConfiguration.cs" /> http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/1d200334/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs new file mode 100644 index 0000000..63036f5 --- /dev/null +++ b/lang/cs/Org.Apache.REEF.Network/StreamingCodec/CommonStreamingCodecs/StringStreamingCodec.cs @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +using System.Threading; +using System.Threading.Tasks; +using Org.Apache.REEF.Tang.Annotations; +using Org.Apache.REEF.Wake.Remote; + +namespace Org.Apache.REEF.Network.StreamingCodec.CommonStreamingCodecs +{ + /// <summary> + /// Streaming codec for string + /// </summary> + public sealed class StringStreamingCodec : IStreamingCodec<string> + { + /// <summary> + /// Injectable constructor + /// </summary> + [Inject] + private StringStreamingCodec() + { + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + ///<returns>The string read from the reader</returns> + public string Read(IDataReader reader) + { + return reader.ReadString(); + } + + /// <summary> + /// Writes the string to the writer. + /// </summary> + /// <param name="obj">The string to be encoded</param> + /// <param name="writer">The writer to which to write</param> + public void Write(string obj, IDataWriter writer) + { + writer.WriteString(obj); + } + + /// <summary> + /// Instantiate the class from the reader. + /// </summary> + /// <param name="reader">The reader from which to read</param> + /// <param name="token">Cancellation token</param> + /// <returns>The string read from the reader</returns> + public async Task<string> ReadAsync(IDataReader reader, CancellationToken token) + { + return await reader.ReadStringAsync(token); + } + + /// <summary> + /// Writes the string to the writer. + /// </summary> + /// <param name="obj">The string to be encoded</param> + /// <param name="writer">The writer to which to write</param> + /// <param name="token">Cancellation token</param> + public async Task WriteAsync(string obj, IDataWriter writer, CancellationToken token) + { + await writer.WriteStringAsync(obj, token); + } + } +}
