Repository: incubator-reef
Updated Branches:
  refs/heads/master b9bb7b139 -> c02c80dac


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceDriver.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceDriver.cs
new file mode 100644
index 0000000..98a68dd
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceDriver.cs
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.Linq;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote;
+using Org.Apache.REEF.Network.Group.Topology;
+
+namespace Org.Apache.REEF.Tests.Functional.MPI.PipelinedBroadcastReduceTest
+{
+    public class PipelinedBroadcastReduceDriver : IStartHandler, 
IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, 
IObserver<IActiveContext>, IObserver<IFailedEvaluator>
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(PipelinedBroadcastReduceDriver));
+
+        private readonly int _numEvaluators;
+        private readonly int _numIterations;
+        private readonly int _chunkSize;
+
+        private readonly IMpiDriver _mpiDriver;
+        private readonly ICommunicationGroupDriver _commGroup;
+        private readonly TaskStarter _mpiTaskStarter;
+
+        [Inject]
+        public PipelinedBroadcastReduceDriver(
+            [Parameter(typeof (MpiTestConfig.NumEvaluators))] int 
numEvaluators,
+            [Parameter(typeof (MpiTestConfig.NumIterations))] int 
numIterations,
+            [Parameter(typeof (MpiTestConfig.ChunkSize))] int chunkSize,
+            MpiDriver mpiDriver)
+        {
+            Logger.Log(Level.Info, "*******entering the driver code " + 
chunkSize);
+
+            Identifier = "BroadcastStartHandler";
+            _numEvaluators = numEvaluators;
+            _numIterations = numIterations;
+            _chunkSize = chunkSize;
+
+            _mpiDriver = mpiDriver;
+
+            _commGroup = _mpiDriver.DefaultGroup
+                .AddBroadcast<int[], IntArrayCodec>(
+                    MpiTestConstants.BroadcastOperatorName,
+                    MpiTestConstants.MasterTaskId,
+                    TopologyTypes.Tree,
+                    new PipelineIntDataConverter(_chunkSize))
+                .AddReduce<int[], IntArrayCodec>(
+                    MpiTestConstants.ReduceOperatorName,
+                    MpiTestConstants.MasterTaskId,
+                    new ArraySumFunction(),
+                    TopologyTypes.Tree,
+                    new PipelineIntDataConverter(_chunkSize))
+                .Build();
+
+            _mpiTaskStarter = new TaskStarter(_mpiDriver, numEvaluators);
+
+            CreateClassHierarchy();
+        }
+
+        public string Identifier { get; set; }
+
+        public void OnNext(IEvaluatorRequestor evaluatorRequestor)
+        {
+            EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 
512, 2, "WonderlandRack", "BroadcastEvaluator");
+            evaluatorRequestor.Submit(request);
+        }
+
+        public void OnNext(IAllocatedEvaluator allocatedEvaluator)
+        {
+            IConfiguration contextConf = _mpiDriver.GetContextConfiguration();
+            IConfiguration serviceConf = _mpiDriver.GetServiceConfiguration();
+            allocatedEvaluator.SubmitContextAndService(contextConf, 
serviceConf);
+        }
+
+        public void OnNext(IActiveContext activeContext)
+        {
+            if (_mpiDriver.IsMasterTaskContext(activeContext))
+            {
+                Logger.Log(Level.Info, "******* Master ID " + activeContext.Id 
);
+
+                // Configure Master Task
+                IConfiguration partialTaskConf = 
TangFactory.GetTang().NewConfigurationBuilder(
+                    TaskConfiguration.ConfigurationModule
+                        .Set(TaskConfiguration.Identifier, 
MpiTestConstants.MasterTaskId)
+                        .Set(TaskConfiguration.Task, 
GenericType<PipelinedMasterTask>.Class)
+                        .Build())
+                    .BindNamedParameter<MpiTestConfig.NumEvaluators, int>(
+                        GenericType<MpiTestConfig.NumEvaluators>.Class,
+                        _numEvaluators.ToString(CultureInfo.InvariantCulture))
+                    .BindNamedParameter<MpiTestConfig.NumIterations, int>(
+                        GenericType<MpiTestConfig.NumIterations>.Class,
+                        _numIterations.ToString(CultureInfo.InvariantCulture))
+                    .BindNamedParameter<MpiTestConfig.ArraySize, int>(
+                        GenericType<MpiTestConfig.ArraySize>.Class,
+                        
MpiTestConstants.ArrayLength.ToString(CultureInfo.InvariantCulture))
+                    .Build();
+
+                _commGroup.AddTask(MpiTestConstants.MasterTaskId);
+                _mpiTaskStarter.QueueTask(partialTaskConf, activeContext);
+            }
+            else
+            {
+                // Configure Slave Task
+                string slaveTaskId = "SlaveTask-" + activeContext.Id;
+                IConfiguration partialTaskConf = 
TangFactory.GetTang().NewConfigurationBuilder(
+                    TaskConfiguration.ConfigurationModule
+                        .Set(TaskConfiguration.Identifier, slaveTaskId)
+                        .Set(TaskConfiguration.Task, 
GenericType<PipelinedSlaveTask>.Class)
+                        .Build())
+                    .BindNamedParameter<MpiTestConfig.NumEvaluators, int>(
+                        GenericType<MpiTestConfig.NumEvaluators>.Class,
+                        _numEvaluators.ToString(CultureInfo.InvariantCulture))
+                    .BindNamedParameter<MpiTestConfig.NumIterations, int>(
+                        GenericType<MpiTestConfig.NumIterations>.Class,
+                        _numIterations.ToString(CultureInfo.InvariantCulture))
+                    .BindNamedParameter<MpiTestConfig.ArraySize, int>(
+                        GenericType<MpiTestConfig.ArraySize>.Class,
+                        
MpiTestConstants.ArrayLength.ToString(CultureInfo.InvariantCulture))
+                    .Build();
+
+                _commGroup.AddTask(slaveTaskId);
+                _mpiTaskStarter.QueueTask(partialTaskConf, activeContext);
+            }
+        }
+
+        public void OnNext(IFailedEvaluator value)
+        {
+        }
+
+        public void OnError(Exception error)
+        {
+        }
+
+        public void OnCompleted()
+        {
+        }
+
+        private void CreateClassHierarchy()
+        {
+            HashSet<string> clrDlls = new HashSet<string>();
+            clrDlls.Add(typeof(IDriver).Assembly.GetName().Name);
+            clrDlls.Add(typeof(ITask).Assembly.GetName().Name);
+            
clrDlls.Add(typeof(PipelinedBroadcastReduceDriver).Assembly.GetName().Name);
+            clrDlls.Add(typeof(INameClient).Assembly.GetName().Name);
+            clrDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
+
+            ClrHandlerHelper.GenerateClassHierarchy(clrDlls);
+        }
+
+        private class SumFunction : IReduceFunction<int>
+        {
+            [Inject]
+            public SumFunction()
+            {
+            }
+
+            public int Reduce(IEnumerable<int> elements)
+            {
+                return elements.Sum();
+            }
+        }
+
+        private class ArraySumFunction : IReduceFunction<int[]>
+        {
+            [Inject]
+            public ArraySumFunction()
+            {
+            }
+
+            public int[] Reduce(IEnumerable<int[]> elements)
+            {
+                int[] result = null;
+                int count = 0;
+
+                foreach (var element in elements)
+                {
+                    if (count == 0)
+                    {
+                        result = element.Clone() as int[];
+                    }
+                    else
+                    {
+                        if (element.Length != result.Length)
+                        {
+                            throw new Exception("integer arrays are of 
different sizes");
+                        }
+
+                        for (int i = 0; i < result.Length; i++)
+                        {
+                            result[i] += element[i];
+                        }
+                    }
+
+                    count++;
+                }
+
+                return result;
+            }
+        }
+
+
+        private class IntArrayCodec : ICodec<int[]>
+        {
+            [Inject]
+            public IntArrayCodec()
+            {
+            }
+
+            public byte[] Encode(int[] obj)
+            {
+                byte[] result = new byte[sizeof(Int32) * obj.Length];
+                Buffer.BlockCopy(obj, 0, result, 0, result.Length);
+                return result;
+            }
+
+            public int[] Decode(byte[] data)
+            {
+                if (data.Length % sizeof(Int32) != 0)
+                {
+                    throw new Exception("error inside integer array decoder, 
byte array length not a multiple of interger size");
+                }
+
+                int[] result = new int[data.Length / sizeof(Int32)];
+                Buffer.BlockCopy(data, 0, result, 0, data.Length);
+                return result;
+            }
+        }
+
+        public class PipelineIntDataConverter : IPipelineDataConverter<int[]>
+        {
+            readonly int _chunkSize;
+            
+            [Inject]
+            public 
PipelineIntDataConverter([Parameter(typeof(MpiTestConfig.ChunkSize))] int 
chunkSize)
+            {
+                _chunkSize = chunkSize;
+            }
+
+            public List<PipelineMessage<int[]>> PipelineMessage(int[] message)
+            {
+                List<PipelineMessage<int[]>> messageList = new 
List<PipelineMessage<int[]>>();
+                int totalChunks = message.Length / _chunkSize;
+
+                if (message.Length % _chunkSize != 0)
+                {
+                    totalChunks++;
+                }
+
+                int counter = 0;
+                for (int i = 0; i < message.Length; i += _chunkSize)
+                {
+                    int[] data = new int[Math.Min(_chunkSize, message.Length - 
i)];
+                    Buffer.BlockCopy(message, i * sizeof(int), data, 0, 
data.Length * sizeof(int));
+
+                    messageList.Add(counter == totalChunks - 1
+                        ? new PipelineMessage<int[]>(data, true)
+                        : new PipelineMessage<int[]>(data, false));
+
+                    counter++;
+                }
+
+                return messageList;
+            }
+
+            public int[] FullMessage(List<PipelineMessage<int[]>> 
pipelineMessage)
+            {
+                int size = pipelineMessage.Select(x => x.Data.Length).Sum();
+                int[] data = new int[size];
+                int offset = 0;
+
+                foreach (var message in pipelineMessage)
+                {
+                    Buffer.BlockCopy(message.Data, 0, data, offset, 
message.Data.Length * sizeof(int));
+                    offset += message.Data.Length * sizeof(int);
+                }
+
+                return data;
+            }
+
+            public IConfiguration GetConfiguration()
+            {
+                return TangFactory.GetTang().NewConfigurationBuilder()
+                .BindNamedParameter<MpiTestConfig.ChunkSize, 
int>(GenericType<MpiTestConfig.ChunkSize>.Class, 
_chunkSize.ToString(CultureInfo.InvariantCulture))
+                .Build();
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceTest.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceTest.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceTest.cs
new file mode 100644
index 0000000..0df71e6
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedBroadcastReduceTest.cs
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Globalization;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.MPI.PipelinedBroadcastReduceTest
+{
+    [TestClass]
+    public class PipelinedBroadcastReduceTest : ReefFunctionalTest
+    {
+        [TestInitialize]
+        public void TestSetup()
+        {
+            CleanUp();
+        }
+
+        [TestCleanup]
+        public void TestCleanup()
+        {
+            CleanUp();
+        }
+
+        [TestMethod]
+        public void TestBroadcastAndReduce()
+        {
+            const int numTasks = 9;
+
+            IConfiguration driverConfig = 
TangFactory.GetTang().NewConfigurationBuilder(
+                DriverBridgeConfiguration.ConfigurationModule
+                    .Set(DriverBridgeConfiguration.OnDriverStarted, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorRequested, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorFailed, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnContextActive, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.CustomTraceLevel, 
Level.Info.ToString())
+                    .Build())
+                .BindNamedParameter<MpiTestConfig.NumIterations, int>(
+                    GenericType<MpiTestConfig.NumIterations>.Class,
+                    
MpiTestConstants.NumIterations.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter<MpiTestConfig.NumEvaluators, int>(
+                    GenericType<MpiTestConfig.NumEvaluators>.Class,
+                    numTasks.ToString(CultureInfo.InvariantCulture))
+                 .BindNamedParameter<MpiTestConfig.ChunkSize, int>(
+                    GenericType<MpiTestConfig.ChunkSize>.Class,
+                    
MpiTestConstants.ChunkSize.ToString(CultureInfo.InvariantCulture))
+                .Build();
+
+            IConfiguration mpiDriverConfig = 
TangFactory.GetTang().NewConfigurationBuilder()
+                
.BindStringNamedParam<MpiConfigurationOptions.DriverId>(MpiTestConstants.DriverId)
+                
.BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(MpiTestConstants.MasterTaskId)
+                
.BindStringNamedParam<MpiConfigurationOptions.GroupName>(MpiTestConstants.GroupName)
+                
.BindIntNamedParam<MpiConfigurationOptions.FanOut>(MpiTestConstants.FanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture))
+                
.BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture))
+                .Build();
+
+            IConfiguration merged = Configurations.Merge(driverConfig, 
mpiDriverConfig);
+                    
+            HashSet<string> appDlls = new HashSet<string>();
+            appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
+            appDlls.Add(typeof(ITask).Assembly.GetName().Name);
+            
appDlls.Add(typeof(PipelinedBroadcastReduceDriver).Assembly.GetName().Name);
+            appDlls.Add(typeof(INameClient).Assembly.GetName().Name);
+            appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
+
+            TestRun(appDlls, merged, false, JavaLoggingSetting.VERBOSE);
+            ValidateSuccessForLocalRuntime(numTasks);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedMasterTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedMasterTask.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedMasterTask.cs
new file mode 100644
index 0000000..922f294
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedMasterTask.cs
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Linq;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.MPI.PipelinedBroadcastReduceTest
+{
+    public class PipelinedMasterTask : ITask
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(PipelinedMasterTask));
+
+        private readonly int _numIters;
+        private readonly int _numReduceSenders;
+        private readonly int _arraySize;
+
+        private readonly IMpiClient _mpiClient;
+        private readonly ICommunicationGroupClient _commGroup;
+        private readonly IBroadcastSender<int[]> _broadcastSender;
+        private readonly IReduceReceiver<int[]> _sumReducer;
+
+        [Inject]
+        public PipelinedMasterTask(
+            [Parameter(typeof(MpiTestConfig.NumIterations))] int numIters,
+            [Parameter(typeof(MpiTestConfig.NumEvaluators))] int numEvaluators,
+            [Parameter(typeof(MpiTestConfig.ArraySize))] int arraySize,
+            IMpiClient mpiClient)
+        {
+            Logger.Log(Level.Info, "Hello from master task");
+            _numIters = numIters;
+            _numReduceSenders = numEvaluators - 1;
+            _arraySize = arraySize;
+            _mpiClient = mpiClient;
+
+            _commGroup = 
mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName);
+            _broadcastSender = 
_commGroup.GetBroadcastSender<int[]>(MpiTestConstants.BroadcastOperatorName);
+            _sumReducer = 
_commGroup.GetReduceReceiver<int[]>(MpiTestConstants.ReduceOperatorName);
+            Logger.Log(Level.Info, "finished master task constructor");
+        }
+
+        public byte[] Call(byte[] memento)
+        {
+            int[] intArr = new int[_arraySize];
+
+            for (int i = 1; i <= _numIters; i++)
+            {
+                for (int j = 0; j < _arraySize; j++)
+                {
+                    intArr[j] = i;
+                }
+
+                _broadcastSender.Send(intArr);
+                int[] sum = _sumReducer.Reduce();
+
+                Logger.Log(Level.Info, "Received sum: {0} on iteration: {1}", 
sum, i);
+
+                int expected = TriangleNumber(i) * _numReduceSenders;
+
+                for (int j = 0; j < intArr.Length; j++)
+                {
+                    if (sum[j] != TriangleNumber(i) * _numReduceSenders)
+                    {
+                        throw new Exception("Expected " + expected + " but got 
" + sum);
+                    }
+                }
+            }
+
+            return null;
+        }
+
+        public void Dispose()
+        {
+            _mpiClient.Dispose();
+        }
+
+        private int TriangleNumber(int n)
+        {
+            return Enumerable.Range(1, n).Sum();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedSlaveTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedSlaveTask.cs
 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedSlaveTask.cs
new file mode 100644
index 0000000..5455121
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Tests/Functional/MPI/PipelinedBroadcastReduceTest/PipelinedSlaveTask.cs
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Linq;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Tests.Functional.MPI.PipelinedBroadcastReduceTest
+{
+    public class PipelinedSlaveTask : ITask
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(PipelinedSlaveTask));
+
+        private readonly int _numIterations;
+        private readonly IMpiClient _mpiClient;
+        private readonly ICommunicationGroupClient _commGroup;
+        private readonly IBroadcastReceiver<int[]> _broadcastReceiver;
+        private readonly IReduceSender<int[]> _triangleNumberSender;
+
+        [Inject]
+        public PipelinedSlaveTask(
+            [Parameter(typeof(MpiTestConfig.NumIterations))] int numIters,
+            IMpiClient mpiClient)
+        {
+            Logger.Log(Level.Info, "Hello from slave task");
+
+            _numIterations = numIters;
+            _mpiClient = mpiClient;
+            _commGroup = 
_mpiClient.GetCommunicationGroup(MpiTestConstants.GroupName);
+            _broadcastReceiver = 
_commGroup.GetBroadcastReceiver<int[]>(MpiTestConstants.BroadcastOperatorName);
+            _triangleNumberSender = 
_commGroup.GetReduceSender<int[]>(MpiTestConstants.ReduceOperatorName);
+        }
+
+        public byte[] Call(byte[] memento)
+        {
+            for (int i = 0; i < _numIterations; i++)
+            {
+                // Receive n from Master Task
+                int[] intVec = _broadcastReceiver.Receive();
+
+                Logger.Log(Level.Info, "Calculating TriangleNumber({0}) on 
slave task...", intVec[0]);
+
+                // Calculate the nth Triangle number and send it back to driver
+                int triangleNum = TriangleNumber(intVec[0]);
+                Logger.Log(Level.Info, "Sending sum: {0} on iteration {1}.", 
triangleNum, i);
+
+                int[] resArr = new int[intVec.Length];
+
+                for (int j = 0; j < resArr.Length; j++)
+                {
+                    resArr[j] = triangleNum;
+                }
+
+                _triangleNumberSender.Send(resArr);
+            }
+
+            return null;
+        }
+
+        public void Dispose()
+        {
+            _mpiClient.Dispose();
+        }
+
+        private int TriangleNumber(int n)
+        {
+            return Enumerable.Range(1, n).Sum();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c02c80da/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj 
b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
index e559ce3..a915f59 100644
--- a/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
+++ b/lang/cs/Org.Apache.REEF.Tests/Org.Apache.REEF.Tests.csproj
@@ -64,6 +64,10 @@ under the License.
     <Compile Include="Functional\MPI\BroadcastReduceTest\SlaveTask.cs" />
     <Compile Include="Functional\MPI\MpiTestConfig.cs" />
     <Compile Include="Functional\MPI\MpiTestConstants.cs" />
+    <Compile 
Include="Functional\MPI\PipelinedBroadcastReduceTest\PipelinedBroadcastReduceDriver.cs"
 />
+    <Compile 
Include="Functional\MPI\PipelinedBroadcastReduceTest\PipelinedBroadcastReduceTest.cs"
 />
+    <Compile 
Include="Functional\MPI\PipelinedBroadcastReduceTest\PipelinedMasterTask.cs" />
+    <Compile 
Include="Functional\MPI\PipelinedBroadcastReduceTest\PipelinedSlaveTask.cs" />
     <Compile Include="Functional\MPI\ScatterReduceTest\MasterTask.cs" />
     <Compile Include="Functional\MPI\ScatterReduceTest\ScatterReduceDriver.cs" 
/>
     <Compile Include="Functional\MPI\ScatterReduceTest\ScatterReduceTest.cs" />
@@ -78,11 +82,11 @@ under the License.
     <Compile Include="Utility\TestDriverConfigGenerator.cs" />
     <Compile Include="Utility\TestExceptions.cs" />
   </ItemGroup>
-  <ItemGroup>  
+  <ItemGroup>
     <None Include="run.cmd">
       <CopyToOutputDirectory>Always</CopyToOutputDirectory>
     </None>
-  <None Include="ConfigFiles\evaluator.conf">
+    <None Include="ConfigFiles\evaluator.conf">
       <CopyToOutputDirectory>Always</CopyToOutputDirectory>
     </None>
     <None Include="packages.config" />
@@ -124,7 +128,7 @@ under the License.
       <Project>{1b983182-9c30-464c-948d-f87eb93a8240}</Project>
       <Name>Org.Apache.REEF.Evaluator</Name>
     </ProjectReference>
-    <ProjectReference 
Include="..\Org.Apache.REEF.Bridge\Org.Apache.REEF.Bridge.vcxproj">
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Bridge\Org.Apache.REEF.Bridge.vcxproj">
       <Project>{4e69d40a-26d6-4d4a-b96d-729946c07fe1}</Project>
       <Name>Org.Apache.REEF.Bridge</Name>
     </ProjectReference>

Reply via email to