[REEF-249]  Adding Network.Examples for Group Communication

This PR is to add a sampel client for Group Communication so that we can run it 
on Yarn env.

- It refactored GroupComunication tests, moved drivers and tasks into 
Network.Examples so that the code can be shared by bath test and client
- It also added Network.Example.Client as a console app. It contains driver 
configuration for group communication samples. The app can run on Yarn without 
any VS and test framework dependency.
- Original folder structure of group communication tests are simplified and MPI 
is renamed as Group.
- Client project is updated to reference jar file and dlls from its own binary 
folder to be consistent with other client projects.

JIRA: REEF-249. (https://issues.apache.org/jira/browse/REEF-249)

This closes #143

Author: Julia Wang  Email: [email protected]


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/c85d45a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/c85d45a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/c85d45a2

Branch: refs/heads/master
Commit: c85d45a27ef897af1b8214471333d9b17fd43134
Parents: 3686317
Author: Julia Wang <[email protected]>
Authored: Mon Apr 13 17:17:21 2015 -0700
Committer: Beysim Sezgin <[email protected]>
Committed: Tue Apr 14 11:00:38 2015 -0700

----------------------------------------------------------------------
 .../Org.Apache.REEF.Client/CLRBridgeClient.cs   |   8 +-
 .../Org.Apache.REEF.Client.csproj               |  15 +-
 lang/cs/Org.Apache.REEF.Client/run.cmd          |   2 +-
 .../Bridge/ClrHandlerHelper.cs                  |   2 +-
 .../BroadcastAndReduceClient.cs                 |  85 +++++
 ...g.Apache.REEF.Network.Examples.Client.csproj | 116 +++++++
 .../PipelineBroadcastAndReduceClient.cs         |  86 +++++
 .../Properties/AssemblyInfo.cs                  |  55 ++++
 .../Run.cs                                      |  58 ++++
 .../run.cmd                                     |  45 +++
 .../BroadcastReduceDriver.cs                    | 174 ++++++++++
 .../BroadcastReduceDriverAndTasks/MasterTask.cs |  89 ++++++
 .../BroadcastReduceDriverAndTasks/SlaveTask.cs  |  80 +++++
 .../GroupCommunication/GroupTestConfig.cs       |  56 ++++
 .../GroupCommunication/GroupTestConstants.cs    |  36 +++
 .../PipelinedBroadcastReduceDriver.cs           | 320 +++++++++++++++++++
 .../PipelinedMasterTask.cs                      | 102 ++++++
 .../PipelinedSlaveTask.cs                       |  89 ++++++
 .../ScatterReduceDriverAndTasks/MasterTask.cs   |  71 ++++
 .../ScatterReduceDriver.cs                      | 157 +++++++++
 .../ScatterReduceDriverAndTasks/SlaveTask.cs    |  67 ++++
 .../Org.Apache.REEF.Network.Examples.csproj     |  96 ++++++
 .../Properties/AssemblyInfo.cs                  |  53 +++
 .../ConfigFiles/evaluator.conf                  | Bin 2837 -> 0 bytes
 .../Functional/Group/BroadcastReduceTest.cs     | 110 +++++++
 .../Group/PipelinedBroadcastReduceTest.cs       | 112 +++++++
 .../Functional/Group/ScatterReduceTest.cs       | 107 +++++++
 .../BroadcastReduceDriver.cs                    | 177 ----------
 .../BroadcastReduceTest/BroadcastReduceTest.cs  | 109 -------
 .../MPI/BroadcastReduceTest/MasterTask.cs       |  89 ------
 .../MPI/BroadcastReduceTest/SlaveTask.cs        |  80 -----
 .../Functional/MPI/MpiTestConfig.cs             |  56 ----
 .../Functional/MPI/MpiTestConstants.cs          |  36 ---
 .../PipelinedBroadcastReduceDriver.cs           | 320 -------------------
 .../PipelinedBroadcastReduceTest.cs             | 110 -------
 .../PipelinedMasterTask.cs                      | 102 ------
 .../PipelinedSlaveTask.cs                       |  89 ------
 .../MPI/ScatterReduceTest/MasterTask.cs         |  71 ----
 .../ScatterReduceTest/ScatterReduceDriver.cs    | 162 ----------
 .../MPI/ScatterReduceTest/ScatterReduceTest.cs  | 104 ------
 .../MPI/ScatterReduceTest/SlaveTask.cs          |  67 ----
 .../Org.Apache.REEF.Tests.csproj                |  24 +-
 lang/cs/Org.Apache.REEF.Tests/run.cmd           |   2 +-
 lang/cs/Org.Apache.REEF.sln                     | Bin 42590 -> 24040 bytes
 44 files changed, 2192 insertions(+), 1597 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Client/CLRBridgeClient.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/CLRBridgeClient.cs 
b/lang/cs/Org.Apache.REEF.Client/CLRBridgeClient.cs
index 5ec77c6..f6d8f82 100644
--- a/lang/cs/Org.Apache.REEF.Client/CLRBridgeClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/CLRBridgeClient.cs
@@ -36,8 +36,8 @@ namespace Org.Apache.REEF.Client
     public class CLRBridgeClient
     {
         public const string ReefHome = "REEF_HOME";
-        public const string DefaultClrFolder = 
@"lang\java\reef-bridge-java\dotnetHello";
-        public const string DefaultReefJar = 
@"lang\java\reef-bridge-java\target\" + Constants.JavaBridgeJarFileName;
+        public const string DefaultClrFolder = ".";
+        public const string DefaultReefJar = Constants.JavaBridgeJarFileName;
         public const string DefaultRunCommand = "run.cmd";
 
         private static string _clrFolder = null;
@@ -116,12 +116,12 @@ namespace Org.Apache.REEF.Client
 
             if (string.IsNullOrWhiteSpace(_reefJar))
             {
-                _reefJar = 
Path.Combine(Environment.GetEnvironmentVariable(ReefHome), DefaultReefJar);
+                _reefJar = DefaultReefJar;
             }
 
             if (string.IsNullOrWhiteSpace(_clrFolder))
             {
-                _clrFolder = 
Path.Combine(Environment.GetEnvironmentVariable(ReefHome), DefaultClrFolder);
+                _clrFolder = DefaultClrFolder;
             }
 
             // Configurable driver submission settings:

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj 
b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
index b7173c3..69c939a 100644
--- a/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
+++ b/lang/cs/Org.Apache.REEF.Client/Org.Apache.REEF.Client.csproj
@@ -89,10 +89,14 @@ under the License.
       <Project>{75503f90-7b82-4762-9997-94b5c68f15db}</Project>
       <Name>Org.Apache.REEF.Examples</Name>
     </ProjectReference>
-    <ProjectReference 
Include="..\Org.Apache.REEF.Bridge\Org.Apache.REEF.Bridge.vcxproj">
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Bridge\Org.Apache.REEF.Bridge.vcxproj">
       <Project>{4e69d40a-26d6-4d4a-b96d-729946c07fe1}</Project>
       <Name>Org.Apache.REEF.Bridge</Name>
     </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Evaluator\Org.Apache.REEF.Evaluator.csproj">
+      <Project>{1b983182-9c30-464c-948d-f87eb93a8240}</Project>
+      <Name>Org.Apache.REEF.Evaluator</Name>
+    </ProjectReference>
   </ItemGroup>
   <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
   <Import Project="$(SolutionDir)\.nuget\NuGet.targets" 
Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
@@ -102,6 +106,15 @@ under the License.
     </PropertyGroup>
     <Error Condition="!Exists('$(SolutionDir)\.nuget\NuGet.targets')" 
Text="$([System.String]::Format('$(ErrorText)', 
'$(SolutionDir)\.nuget\NuGet.targets'))" />
   </Target>
+  <!--begin jar reference-->
+  <PropertyGroup>
+    <AfterBuildDependsOn>
+      $(AfterBuildDependsOn);
+      CopyJarFiles;
+    </AfterBuildDependsOn>
+  </PropertyGroup>
+  <Target Name="AfterBuild" DependsOnTargets="$(AfterBuildDependsOn);" />
+  <!--end jar reference-->
   <!-- To modify your build process, add your task inside one of the targets 
below and uncomment it. 
        Other similar extension points exist, see Microsoft.Common.targets.
   <Target Name="BeforeBuild">

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Client/run.cmd
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Client/run.cmd 
b/lang/cs/Org.Apache.REEF.Client/run.cmd
index 3b64520..ae69a5f 100644
--- a/lang/cs/Org.Apache.REEF.Client/run.cmd
+++ b/lang/cs/Org.Apache.REEF.Client/run.cmd
@@ -33,7 +33,7 @@
 
 
 :: RUNTIME
-set 
SHADED_JAR=%REEF_HOME%\lang\reef-bridge\target\reef-bridge-0.11.0-incubating-SNAPSHOT-shaded.jar
+set 
SHADED_JAR=%REEF_HOME%\lang\reef-bridge\target\reef-bridge-java-0.11.0-incubating-SNAPSHOT-shaded.jar
 
 set 
LOGGING_CONFIG=-Djava.util.logging.config.class=org.apache.reef.util.logging.CLRLoggingConfig
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrHandlerHelper.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrHandlerHelper.cs 
b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrHandlerHelper.cs
index bce5ce4..6ef87bb 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrHandlerHelper.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/ClrHandlerHelper.cs
@@ -40,7 +40,7 @@ namespace Org.Apache.REEF.Driver.Bridge
         {
             get
             {
-                return new[] { "Microsoft.Hadoop.Avro.dll", 
"Org.Apache.REEF.Driver.dll", "Org.Apache.REEF.Common.dll", 
"Org.Apache.REEF.Utilities.dll", "Org.Apache.REEF.Network.dll", 
"Org.Apache.REEF.Tang.dll", "Org.Apache.REEF.Wake.dll", "Newtonsoft.Json.dll", 
"protobuf-net.dll" };
+                return new[] { "Microsoft.Hadoop.Avro.dll", 
"Org.Apache.REEF.Driver.dll", "Org.Apache.REEF.Common.dll", 
"Org.Apache.REEF.Utilities.dll", "Org.Apache.REEF.Network.dll", 
"Org.Apache.REEF.Tang.dll", "Org.Apache.REEF.Wake.dll", 
"Org.Apache.REEF.Bridge.dll", "Newtonsoft.Json.dll", "protobuf-net.dll" };
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs 
b/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs
new file mode 100644
index 0000000..0db6ce0
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples.Client/BroadcastAndReduceClient.cs
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Globalization;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Network.Examples.GroupCommunication;
+using 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Network.Examples.Client
+{
+    class BroadcastAndReduceClient
+    {
+        public void RunBroadcastAndReduce(bool runOnYarn, int numTasks)
+        {
+            const int numIterations = 10;
+            const string driverId = "BroadcastReduceDriver";
+            const string groupName = "BroadcastReduceGroup";
+            const string masterTaskId = "MasterTask";
+            const int fanOut = 2;
+
+            IConfiguration driverConfig = 
TangFactory.GetTang().NewConfigurationBuilder(
+                DriverBridgeConfiguration.ConfigurationModule
+                    .Set(DriverBridgeConfiguration.OnDriverStarted, 
GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, 
GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorRequested, 
GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorFailed, 
GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnContextActive, 
GenericType<BroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.CustomTraceLevel, 
Level.Info.ToString())
+                    .Build())
+                .BindNamedParameter<GroupTestConfig.NumIterations, int>(
+                    GenericType<GroupTestConfig.NumIterations>.Class,
+                    numIterations.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter<GroupTestConfig.NumEvaluators, int>(
+                    GenericType<GroupTestConfig.NumEvaluators>.Class,
+                    numTasks.ToString(CultureInfo.InvariantCulture))
+                .Build();
+
+            IConfiguration mpiDriverConfig = 
TangFactory.GetTang().NewConfigurationBuilder()
+                
.BindStringNamedParam<MpiConfigurationOptions.DriverId>(driverId)
+                
.BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(masterTaskId)
+                
.BindStringNamedParam<MpiConfigurationOptions.GroupName>(groupName)
+                
.BindIntNamedParam<MpiConfigurationOptions.FanOut>(fanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture))
+                
.BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture))
+                .Build();
+
+            IConfiguration merged = Configurations.Merge(driverConfig, 
mpiDriverConfig);
+
+            HashSet<string> appDlls = new HashSet<string>();
+            appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
+            appDlls.Add(typeof(ITask).Assembly.GetName().Name);
+            appDlls.Add(typeof(BroadcastReduceDriver).Assembly.GetName().Name);
+            appDlls.Add(typeof(INameClient).Assembly.GetName().Name);
+            appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
+
+            ClrClientHelper.Run(appDlls, merged, new 
DriverSubmissionSettings() { RunOnYarn = runOnYarn, JavaLogLevel = 
JavaLoggingSetting.VERBOSE });
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples.Client/Org.Apache.REEF.Network.Examples.Client.csproj
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples.Client/Org.Apache.REEF.Network.Examples.Client.csproj
 
b/lang/cs/Org.Apache.REEF.Network.Examples.Client/Org.Apache.REEF.Network.Examples.Client.csproj
new file mode 100644
index 0000000..6a65ae5
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples.Client/Org.Apache.REEF.Network.Examples.Client.csproj
@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="12.0" DefaultTargets="Build" 
xmlns="http://schemas.microsoft.com/developer/msbuild/2003";>
+  <Import 
Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props"
 
Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')"
 />
+  <PropertyGroup>
+    <ProjectGuid>{E6DA8BC4-A346-48A7-99E3-D47ADD7DB975}</ProjectGuid>
+    <OutputType>Exe</OutputType>
+    <AppDesignerFolder>Properties</AppDesignerFolder>
+    <RootNamespace>Org.Apache.REEF.Network.Examples.Client</RootNamespace>
+    <AssemblyName>Org.Apache.REEF.Network.Examples.Client</AssemblyName>
+    <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
+    <FileAlignment>512</FileAlignment>
+    <SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == 
'*Undefined*'">..</SolutionDir>
+    <RestorePackages>true</RestorePackages>
+  </PropertyGroup>
+  <Import Project="$(SolutionDir)\build.props" />
+  <PropertyGroup>
+    <BuildPackage>false</BuildPackage>
+    <UseVSHostingProcess>false</UseVSHostingProcess>
+  </PropertyGroup>
+  <ItemGroup>
+    <Reference Include="System" />
+    <Reference Include="System.Core" />
+    <Reference Include="System.Xml.Linq" />
+    <Reference Include="System.Data.DataSetExtensions" />
+    <Reference Include="Microsoft.CSharp" />
+    <Reference Include="System.Data" />
+    <Reference Include="System.Xml" />
+  </ItemGroup>
+  <ItemGroup>
+    <Compile Include="BroadcastAndReduceClient.cs" />
+    <Compile Include="PipelineBroadcastAndReduceClient.cs" />
+    <Compile Include="Run.cs" />
+    <Compile Include="Properties\AssemblyInfo.cs" />
+  </ItemGroup>
+  <ItemGroup>
+    <None Include="run.cmd">
+      <CopyToOutputDirectory>Always</CopyToOutputDirectory>
+    </None>
+  </ItemGroup>
+  <ItemGroup>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Bridge\Org.Apache.REEF.Bridge.vcxproj">
+      <Project>{4e69d40a-26d6-4d4a-b96d-729946c07fe1}</Project>
+      <Name>Org.Apache.REEF.Bridge</Name>
+    </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Client\Org.Apache.REEF.Client.csproj">
+      <Project>{5094c35b-4fdb-4322-ac05-45d684501cbf}</Project>
+      <Name>Org.Apache.REEF.Client</Name>
+    </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Common\Org.Apache.REEF.Common.csproj">
+      <Project>{545a0582-4105-44ce-b99c-b1379514a630}</Project>
+      <Name>Org.Apache.REEF.Common</Name>
+    </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Driver\Org.Apache.REEF.Driver.csproj">
+      <Project>{a6baa2a7-f52f-4329-884e-1bcf711d6805}</Project>
+      <Name>Org.Apache.REEF.Driver</Name>
+    </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Evaluator\Org.Apache.REEF.Evaluator.csproj">
+      <Project>{1b983182-9c30-464c-948d-f87eb93a8240}</Project>
+      <Name>Org.Apache.REEF.Evaluator</Name>
+    </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Network.Examples\Org.Apache.REEF.Network.Examples.csproj">
+      <Project>{b1b43b60-ddd0-4805-a9b4-ba84a0ccb7c7}</Project>
+      <Name>Org.Apache.REEF.Network.Examples</Name>
+    </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Network\Org.Apache.REEF.Network.csproj">
+      <Project>{883ce800-6a6a-4e0a-b7fe-c054f4f2c1dc}</Project>
+      <Name>Org.Apache.REEF.Network</Name>
+    </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Tang\Org.Apache.REEF.Tang.csproj">
+      <Project>{97dbb573-3994-417a-9f69-ffa25f00d2a6}</Project>
+      <Name>Org.Apache.REEF.Tang</Name>
+    </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Utilities\Org.Apache.REEF.Utilities.csproj">
+      <Project>{79e7f89a-1dfb-45e1-8d43-d71a954aeb98}</Project>
+      <Name>Org.Apache.REEF.Utilities</Name>
+    </ProjectReference>
+    <ProjectReference 
Include="$(SolutionDir)\Org.Apache.REEF.Wake\Org.Apache.REEF.Wake.csproj">
+      <Project>{cdfb3464-4041-42b1-9271-83af24cd5008}</Project>
+      <Name>Org.Apache.REEF.Wake</Name>
+    </ProjectReference>
+  </ItemGroup>
+  <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+  <Import Project="$(SolutionDir)\.nuget\NuGet.targets" 
Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
+  <!--begin jar reference-->
+  <PropertyGroup>
+    <AfterBuildDependsOn>
+      $(AfterBuildDependsOn);
+      CopyJarFiles;
+    </AfterBuildDependsOn>
+  </PropertyGroup>
+  <Target Name="AfterBuild" DependsOnTargets="$(AfterBuildDependsOn);" />
+  <!--end jar reference-->
+  <!-- To modify your build process, add your task inside one of the targets 
below and uncomment it. 
+       Other similar extension points exist, see Microsoft.Common.targets.
+  <Target Name="BeforeBuild">
+  </Target>
+  <Target Name="AfterBuild">
+  </Target>
+  -->
+</Project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
new file mode 100644
index 0000000..570b227
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples.Client/PipelineBroadcastAndReduceClient.cs
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Network.Examples.GroupCommunication;
+using 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks;
+using Org.Apache.REEF.Network.Group.Config;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Network.Examples.Client
+{
+    public class PipelineBroadcastAndReduceClient
+    {
+        public void RunPipelineBroadcastAndReduce(bool runOnYarn, int numTasks)
+        {
+            IConfiguration driverConfig = 
TangFactory.GetTang().NewConfigurationBuilder(
+                DriverBridgeConfiguration.ConfigurationModule
+                    .Set(DriverBridgeConfiguration.OnDriverStarted, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorAllocated, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorRequested, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnEvaluatorFailed, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.OnContextActive, 
GenericType<PipelinedBroadcastReduceDriver>.Class)
+                    .Set(DriverBridgeConfiguration.CustomTraceLevel, 
Level.Info.ToString())
+                    .Build())
+                .BindNamedParameter<GroupTestConfig.NumIterations, int>(
+                    GenericType<GroupTestConfig.NumIterations>.Class,
+                    
GroupTestConstants.NumIterations.ToString(CultureInfo.InvariantCulture))
+                .BindNamedParameter<GroupTestConfig.NumEvaluators, int>(
+                    GenericType<GroupTestConfig.NumEvaluators>.Class,
+                    numTasks.ToString(CultureInfo.InvariantCulture))
+                 .BindNamedParameter<GroupTestConfig.ChunkSize, int>(
+                    GenericType<GroupTestConfig.ChunkSize>.Class,
+                    
GroupTestConstants.ChunkSize.ToString(CultureInfo.InvariantCulture))
+                .Build();
+
+            IConfiguration mpiDriverConfig = 
TangFactory.GetTang().NewConfigurationBuilder()
+                
.BindStringNamedParam<MpiConfigurationOptions.DriverId>(GroupTestConstants.DriverId)
+                
.BindStringNamedParam<MpiConfigurationOptions.MasterTaskId>(GroupTestConstants.MasterTaskId)
+                
.BindStringNamedParam<MpiConfigurationOptions.GroupName>(GroupTestConstants.GroupName)
+                
.BindIntNamedParam<MpiConfigurationOptions.FanOut>(GroupTestConstants.FanOut.ToString(CultureInfo.InvariantCulture).ToString(CultureInfo.InvariantCulture))
+                
.BindIntNamedParam<MpiConfigurationOptions.NumberOfTasks>(numTasks.ToString(CultureInfo.InvariantCulture))
+                .Build();
+
+            IConfiguration merged = Configurations.Merge(driverConfig, 
mpiDriverConfig);
+
+            HashSet<string> appDlls = new HashSet<string>();
+            appDlls.Add(typeof(IDriver).Assembly.GetName().Name);
+            appDlls.Add(typeof(ITask).Assembly.GetName().Name);
+            
appDlls.Add(typeof(PipelinedBroadcastReduceDriver).Assembly.GetName().Name);
+            appDlls.Add(typeof(INameClient).Assembly.GetName().Name);
+            appDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
+
+            ClrClientHelper.Run(appDlls, merged, new 
DriverSubmissionSettings() { RunOnYarn = runOnYarn, JavaLogLevel = 
JavaLoggingSetting.VERBOSE });
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples.Client/Properties/AssemblyInfo.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples.Client/Properties/AssemblyInfo.cs 
b/lang/cs/Org.Apache.REEF.Network.Examples.Client/Properties/AssemblyInfo.cs
new file mode 100644
index 0000000..9716526
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Examples.Client/Properties/AssemblyInfo.cs
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following 
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Org.Apache.REEF.Tests.Yarn")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("")]
+[assembly: AssemblyProduct("Org.Apache.REEF.Tests.Yarn")]
+[assembly: AssemblyCopyright("Copyright ©  2015")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible 
+// to COM components.  If you need to access a type in this assembly from 
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed 
to COM
+[assembly: Guid("22ae839f-8ae7-46ac-b4bd-6d0d32213d83")]
+
+// Version information for an assembly consists of the following four values:
+//
+//      Major Version
+//      Minor Version 
+//      Build Number
+//      Revision
+//
+// You can specify all the values or you can default the Build and Revision 
Numbers 
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs 
b/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs
new file mode 100644
index 0000000..899e548
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Examples.Client/Run.cs
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+
+namespace Org.Apache.REEF.Network.Examples.Client
+{
+    public class Run
+    {
+        static void Main(string[] args)
+        {
+            Console.WriteLine("start running client: " + DateTime.Now);
+            bool runOnYarn = false;
+            List<string> testToRun = new List<string>();
+            if (args != null)
+            {
+                if (args.Length > 0)
+                {
+                    runOnYarn = bool.Parse(args[0].ToLower());
+                }
+
+                for (int i = 1; i < args.Length; i++)
+                {
+                    testToRun.Add(args[i].ToLower());
+                }
+            }
+
+            if (testToRun.Contains("RunPipelineBroadcastAndReduce".ToLower()) 
|| testToRun.Contains("all") || testToRun.Count == 0)
+            {
+                new 
PipelineBroadcastAndReduceClient().RunPipelineBroadcastAndReduce(runOnYarn, 9);
+                Console.WriteLine("RunPipelineBroadcastAndReduce 
completed!!!");
+            }
+
+            if (testToRun.Contains("RunBroadcastAndReduce".ToLower()) || 
testToRun.Contains("all") || testToRun.Count == 0)
+            {
+                new 
BroadcastAndReduceClient().RunBroadcastAndReduce(runOnYarn, 9);
+                Console.WriteLine("RunBroadcastAndReduce completed!!!");
+            }           
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples.Client/run.cmd
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Network.Examples.Client/run.cmd 
b/lang/cs/Org.Apache.REEF.Network.Examples.Client/run.cmd
new file mode 100644
index 0000000..bfdd44e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Network.Examples.Client/run.cmd
@@ -0,0 +1,45 @@
+@REM
+@REM Copyright (C) 2013 Microsoft Corporation
+@REM
+@REM Licensed under the Apache License, Version 2.0 (the "License");
+@REM you may not use this file except in compliance with the License.
+@REM You may obtain a copy of the License at
+@REM
+@REM         http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing, software
+@REM distributed under the License is distributed on an "AS IS" BASIS,
+@REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@REM See the License for the specific language governing permissions and
+@REM limitations under the License.
+@REM
+
+@echo off
+::
+:: Copyright (C) 2013 Microsoft Corporation
+::
+:: Licensed under the Apache License, Version 2.0 (the "License");
+:: you may not use this file except in compliance with the License.
+:: You may obtain a copy of the License at
+::
+::         http:\\www.apache.org\licenses\LICENSE-2.0
+::
+:: Unless required by applicable law or agreed to in writing, software
+:: distributed under the License is distributed on an "AS IS" BASIS,
+:: WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+:: See the License for the specific language governing permissions and
+:: limitations under the License.
+::
+
+
+:: RUNTIME
+set SHADED_JAR=.\reef-bridge-java-0.11.0-incubating-SNAPSHOT-shaded.jar
+
+set 
LOGGING_CONFIG=-Djava.util.logging.config.class=org.apache.reef.util.logging.Config
+
+set 
CLASSPATH=%HADOOP_HOME%\share\hadoop\hdfs\lib\*;%HADOOP_HOME%\share\hadoop\hdfs\*;%HADOOP_HOME%\share\hadoop\common\*;%HADOOP_HOME%\share\hadoop\common\lib\*;%HADOOP_HOME%\share\hadoop\mapreduce\lib\*;%HADOOP_HOME%\share\hadoop\mapreduce\*;%HADOOP_HOME%\share\hadoop\yarn\*;%HADOOP_HOME%\share\hadoop\yarn\lib\*
+
+set CMD=%JAVA_HOME%\bin\java.exe -cp 
%HADOOP_HOME%\etc\hadoop;%SHADED_JAR%;%CLASSPATH% %*
+::%LOGGING_CONFIG%
+echo %CMD%
+%CMD%

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
new file mode 100644
index 0000000..c6b8578
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/BroadcastReduceDriver.cs
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.Linq;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks
+{
+    public class BroadcastReduceDriver : IStartHandler, 
IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, 
IObserver<IActiveContext>, IObserver<IFailedEvaluator>
+    {
+        private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(BroadcastReduceDriver));
+
+        private readonly int _numEvaluators;
+        private readonly int _numIterations;
+
+        private readonly IMpiDriver _mpiDriver;
+        private readonly ICommunicationGroupDriver _commGroup;
+        private readonly TaskStarter _mpiTaskStarter;
+
+        [Inject]
+        public BroadcastReduceDriver(
+            [Parameter(typeof(GroupTestConfig.NumEvaluators))] int 
numEvaluators,
+            [Parameter(typeof(GroupTestConfig.NumIterations))] int 
numIterations,
+            MpiDriver mpiDriver)
+        {
+            Identifier = "BroadcastStartHandler";
+            _numEvaluators = numEvaluators;
+            _numIterations = numIterations;
+            _mpiDriver = mpiDriver;
+            _commGroup = _mpiDriver.DefaultGroup
+                    .AddBroadcast<int, IntCodec>(
+                        GroupTestConstants.BroadcastOperatorName,
+                       GroupTestConstants.MasterTaskId)
+                    .AddReduce<int, IntCodec>(
+                        GroupTestConstants.ReduceOperatorName,
+                            GroupTestConstants.MasterTaskId,
+                            new SumFunction())
+                    .Build();
+
+            _mpiTaskStarter = new TaskStarter(_mpiDriver, numEvaluators);
+
+            CreateClassHierarchy();
+        }
+
+        public string Identifier { get; set; }
+
+        public void OnNext(IEvaluatorRequestor evaluatorRequestor)
+        {
+            EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 
512, 2, "WonderlandRack", "BroadcastEvaluator");
+            evaluatorRequestor.Submit(request);
+        }
+
+        public void OnNext(IAllocatedEvaluator allocatedEvaluator)
+        {
+            IConfiguration contextConf = _mpiDriver.GetContextConfiguration();
+            IConfiguration serviceConf = _mpiDriver.GetServiceConfiguration();
+            allocatedEvaluator.SubmitContextAndService(contextConf, 
serviceConf);
+        }
+
+        public void OnNext(IActiveContext activeContext)
+        {
+            if (_mpiDriver.IsMasterTaskContext(activeContext))
+            {
+                // Configure Master Task
+                IConfiguration partialTaskConf = 
TangFactory.GetTang().NewConfigurationBuilder(
+                    TaskConfiguration.ConfigurationModule
+                        .Set(TaskConfiguration.Identifier, 
GroupTestConstants.MasterTaskId)
+                        .Set(TaskConfiguration.Task, 
GenericType<MasterTask>.Class)
+                        .Build())
+                    .BindNamedParameter<GroupTestConfig.NumEvaluators, int>(
+                        GenericType<GroupTestConfig.NumEvaluators>.Class,
+                        _numEvaluators.ToString(CultureInfo.InvariantCulture))
+                    .BindNamedParameter<GroupTestConfig.NumIterations, int>(
+                        GenericType<GroupTestConfig.NumIterations>.Class,
+                        _numIterations.ToString(CultureInfo.InvariantCulture))
+                    .Build();
+
+                _commGroup.AddTask(GroupTestConstants.MasterTaskId);
+                _mpiTaskStarter.QueueTask(partialTaskConf, activeContext);
+            }
+            else
+            {
+                // Configure Slave Task
+                string slaveTaskId = "SlaveTask-" + activeContext.Id;
+                IConfiguration partialTaskConf = 
TangFactory.GetTang().NewConfigurationBuilder(
+                    TaskConfiguration.ConfigurationModule
+                        .Set(TaskConfiguration.Identifier, slaveTaskId)
+                        .Set(TaskConfiguration.Task, 
GenericType<SlaveTask>.Class)
+                        .Build())
+                    .BindNamedParameter<GroupTestConfig.NumEvaluators, int>(
+                        GenericType<GroupTestConfig.NumEvaluators>.Class,
+                        _numEvaluators.ToString(CultureInfo.InvariantCulture))
+                    .BindNamedParameter<GroupTestConfig.NumIterations, int>(
+                        GenericType<GroupTestConfig.NumIterations>.Class,
+                        _numIterations.ToString(CultureInfo.InvariantCulture))
+                    .Build();
+
+                _commGroup.AddTask(slaveTaskId);
+                _mpiTaskStarter.QueueTask(partialTaskConf, activeContext);
+            }
+        }
+
+        public void OnNext(IFailedEvaluator value)
+        {
+        }
+
+        public void OnError(Exception error)
+        {
+        }
+
+        public void OnCompleted()
+        {
+        }
+
+        private void CreateClassHierarchy()
+        {
+            HashSet<string> clrDlls = new HashSet<string>();
+            clrDlls.Add(typeof(IDriver).Assembly.GetName().Name);
+            clrDlls.Add(typeof(ITask).Assembly.GetName().Name);
+            clrDlls.Add(typeof(BroadcastReduceDriver).Assembly.GetName().Name);
+            clrDlls.Add(typeof(INameClient).Assembly.GetName().Name);
+            clrDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
+
+            ClrHandlerHelper.GenerateClassHierarchy(clrDlls);
+        }
+
+        private class SumFunction : IReduceFunction<int>
+        {
+            [Inject]
+            public SumFunction()
+            {
+            }
+
+            public int Reduce(IEnumerable<int> elements)
+            {
+                return elements.Sum();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs
new file mode 100644
index 0000000..1ca2353
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/MasterTask.cs
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Linq;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks
+{
+    public class MasterTask : ITask
+    {
+        private static readonly Logger _logger = 
Logger.GetLogger(typeof(MasterTask));
+
+        private readonly int _numIters;
+        private readonly int _numReduceSenders;
+
+        private readonly IMpiClient _mpiClient;
+        private readonly ICommunicationGroupClient _commGroup;
+        private readonly IBroadcastSender<int> _broadcastSender;
+        private readonly IReduceReceiver<int> _sumReducer;
+
+        [Inject]
+        public MasterTask(
+            [Parameter(typeof(GroupTestConfig.NumIterations))] int numIters,
+            [Parameter(typeof(GroupTestConfig.NumEvaluators))] int 
numEvaluators,
+            IMpiClient mpiClient)
+        {
+            _logger.Log(Level.Info, "Hello from master task");
+            _numIters = numIters;
+            _numReduceSenders = numEvaluators - 1;
+            _mpiClient = mpiClient;
+
+            _commGroup = 
mpiClient.GetCommunicationGroup(GroupTestConstants.GroupName);
+            _broadcastSender = 
_commGroup.GetBroadcastSender<int>(GroupTestConstants.BroadcastOperatorName);
+            _sumReducer = 
_commGroup.GetReduceReceiver<int>(GroupTestConstants.ReduceOperatorName);
+        }
+
+        public byte[] Call(byte[] memento)
+        {
+            for (int i = 1; i <= _numIters; i++)
+            {
+                // Each slave task calculates the nth triangle number
+                _broadcastSender.Send(i);
+                
+                // Sum up all of the calculated triangle numbers
+                int sum = _sumReducer.Reduce();
+                _logger.Log(Level.Info, "Received sum: {0} on iteration: {1}", 
sum, i);
+
+                int expected = TriangleNumber(i) * _numReduceSenders;
+                if (sum != TriangleNumber(i) * _numReduceSenders)
+                {
+                    throw new Exception("Expected " + expected + " but got " + 
sum);
+                }
+            }
+
+            return null;
+        }
+
+        public void Dispose()
+        {
+            _mpiClient.Dispose();
+        }
+
+        private int TriangleNumber(int n)
+        {
+            return Enumerable.Range(1, n).Sum();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs
new file mode 100644
index 0000000..c09bb80
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/BroadcastReduceDriverAndTasks/SlaveTask.cs
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Linq;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.BroadcastReduceDriverAndTasks
+{
+    public class SlaveTask : ITask
+    {
+        private static readonly Logger _logger = 
Logger.GetLogger(typeof(SlaveTask));
+
+        private readonly int _numIterations;
+        private readonly IMpiClient _mpiClient;
+        private readonly ICommunicationGroupClient _commGroup;
+        private readonly IBroadcastReceiver<int> _broadcastReceiver;
+        private readonly IReduceSender<int> _triangleNumberSender;
+
+        [Inject]
+        public SlaveTask(
+            [Parameter(typeof(GroupTestConfig.NumIterations))] int numIters,
+            IMpiClient mpiClient)
+        {
+            _logger.Log(Level.Info, "Hello from slave task");
+
+            _numIterations = numIters;
+            _mpiClient = mpiClient;
+            _commGroup = 
_mpiClient.GetCommunicationGroup(GroupTestConstants.GroupName);
+            _broadcastReceiver = 
_commGroup.GetBroadcastReceiver<int>(GroupTestConstants.BroadcastOperatorName);
+            _triangleNumberSender = 
_commGroup.GetReduceSender<int>(GroupTestConstants.ReduceOperatorName);
+        }
+
+        public byte[] Call(byte[] memento)
+        {
+            for (int i = 0; i < _numIterations; i++)
+            {
+                // Receive n from Master Task
+                int n = _broadcastReceiver.Receive();
+                _logger.Log(Level.Info, "Calculating TriangleNumber({0}) on 
slave task...", n);
+
+                // Calculate the nth Triangle number and send it back to driver
+                int triangleNum = TriangleNumber(n);
+                _logger.Log(Level.Info, "Sending sum: {0} on iteration {1}.", 
triangleNum, i);
+                _triangleNumberSender.Send(triangleNum);
+            }
+
+            return null;
+        }
+
+        public void Dispose()
+        {
+            _mpiClient.Dispose();
+        }
+
+        private int TriangleNumber(int n)
+        {
+            return Enumerable.Range(1, n).Sum();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs
new file mode 100644
index 0000000..c7d93c1
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConfig.cs
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Network.Examples.GroupCommunication
+{
+    public class GroupTestConfig
+    {
+        [NamedParameter("Number of iterations of messages to send")]
+        public class NumIterations : Name<int>
+        {
+        }
+
+        [NamedParameter("Number of Evaluators")]
+        public class NumEvaluators : Name<int>
+        {
+        }
+
+        [NamedParameter("tree width")]
+        public class FanOut : Name<int>
+        {
+        }
+
+        [NamedParameter("integer id of the evaluator")]
+        public class EvaluatorId : Name<string>
+        {
+        }
+
+        [NamedParameter("Size of the array")]
+        public class ArraySize : Name<int>
+        {
+        }
+
+        [NamedParameter("Chunk size for pipelining")]
+        public class ChunkSize : Name<int>
+        {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConstants.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConstants.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConstants.cs
new file mode 100644
index 0000000..cbfebfb
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/GroupTestConstants.cs
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace Org.Apache.REEF.Network.Examples.GroupCommunication
+{
+    public class GroupTestConstants
+    {
+        public const string DriverId = "BroadcastReduceDriver";
+        public const string GroupName = "BroadcastReduceGroup";
+        public const string BroadcastOperatorName = "Broadcast";
+        public const string ReduceOperatorName = "Reduce";
+        public const string ScatterOperatorName = "Scatter";
+        public const string MasterTaskId = "MasterTask";
+        public const string SlaveTaskId = "SlaveTask-";
+        public const int NumIterations = 10;
+        public const int FanOut = 2;
+        public const int ChunkSize = 2;
+        public const int ArrayLength = 6;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
new file mode 100644
index 0000000..d108d68
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedBroadcastReduceDriver.cs
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Globalization;
+using System.Linq;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Pipelining;
+using Org.Apache.REEF.Network.Group.Topology;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks
+{
+    public class PipelinedBroadcastReduceDriver : IStartHandler, 
IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, 
IObserver<IActiveContext>, IObserver<IFailedEvaluator>
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(PipelinedBroadcastReduceDriver));
+
+        private readonly int _numEvaluators;
+        private readonly int _numIterations;
+        private readonly int _chunkSize;
+
+        private readonly IMpiDriver _mpiDriver;
+        private readonly ICommunicationGroupDriver _commGroup;
+        private readonly TaskStarter _mpiTaskStarter;
+
+        [Inject]
+        public PipelinedBroadcastReduceDriver(
+            [Parameter(typeof (GroupTestConfig.NumEvaluators))] int 
numEvaluators,
+            [Parameter(typeof(GroupTestConfig.NumIterations))] int 
numIterations,
+            [Parameter(typeof(GroupTestConfig.ChunkSize))] int chunkSize,
+            MpiDriver mpiDriver)
+        {
+            Logger.Log(Level.Info, "*******entering the driver code " + 
chunkSize);
+
+            Identifier = "BroadcastStartHandler";
+            _numEvaluators = numEvaluators;
+            _numIterations = numIterations;
+            _chunkSize = chunkSize;
+
+            _mpiDriver = mpiDriver;
+
+            _commGroup = _mpiDriver.DefaultGroup
+                .AddBroadcast<int[], IntArrayCodec>(
+                    GroupTestConstants.BroadcastOperatorName,
+                    GroupTestConstants.MasterTaskId,
+                    TopologyTypes.Tree,
+                    new PipelineIntDataConverter(_chunkSize))
+                .AddReduce<int[], IntArrayCodec>(
+                    GroupTestConstants.ReduceOperatorName,
+                    GroupTestConstants.MasterTaskId,
+                    new ArraySumFunction(),
+                    TopologyTypes.Tree,
+                    new PipelineIntDataConverter(_chunkSize))
+                .Build();
+
+            _mpiTaskStarter = new TaskStarter(_mpiDriver, numEvaluators);
+
+            CreateClassHierarchy();
+        }
+
+        public string Identifier { get; set; }
+
+        public void OnNext(IEvaluatorRequestor evaluatorRequestor)
+        {
+            EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 
512, 2, "WonderlandRack", "BroadcastEvaluator");
+            evaluatorRequestor.Submit(request);
+        }
+
+        public void OnNext(IAllocatedEvaluator allocatedEvaluator)
+        {
+            IConfiguration contextConf = _mpiDriver.GetContextConfiguration();
+            IConfiguration serviceConf = _mpiDriver.GetServiceConfiguration();
+            allocatedEvaluator.SubmitContextAndService(contextConf, 
serviceConf);
+        }
+
+        public void OnNext(IActiveContext activeContext)
+        {
+            if (_mpiDriver.IsMasterTaskContext(activeContext))
+            {
+                Logger.Log(Level.Info, "******* Master ID " + activeContext.Id 
);
+
+                // Configure Master Task
+                IConfiguration partialTaskConf = 
TangFactory.GetTang().NewConfigurationBuilder(
+                    TaskConfiguration.ConfigurationModule
+                        .Set(TaskConfiguration.Identifier, 
GroupTestConstants.MasterTaskId)
+                        .Set(TaskConfiguration.Task, 
GenericType<PipelinedMasterTask>.Class)
+                        .Build())
+                    .BindNamedParameter<GroupTestConfig.NumEvaluators, int>(
+                        GenericType<GroupTestConfig.NumEvaluators>.Class,
+                        _numEvaluators.ToString(CultureInfo.InvariantCulture))
+                    .BindNamedParameter<GroupTestConfig.NumIterations, int>(
+                        GenericType<GroupTestConfig.NumIterations>.Class,
+                        _numIterations.ToString(CultureInfo.InvariantCulture))
+                    .BindNamedParameter<GroupTestConfig.ArraySize, int>(
+                        GenericType<GroupTestConfig.ArraySize>.Class,
+                        
GroupTestConstants.ArrayLength.ToString(CultureInfo.InvariantCulture))
+                    .Build();
+
+                _commGroup.AddTask(GroupTestConstants.MasterTaskId);
+                _mpiTaskStarter.QueueTask(partialTaskConf, activeContext);
+            }
+            else
+            {
+                // Configure Slave Task
+                string slaveTaskId = "SlaveTask-" + activeContext.Id;
+                IConfiguration partialTaskConf = 
TangFactory.GetTang().NewConfigurationBuilder(
+                    TaskConfiguration.ConfigurationModule
+                        .Set(TaskConfiguration.Identifier, slaveTaskId)
+                        .Set(TaskConfiguration.Task, 
GenericType<PipelinedSlaveTask>.Class)
+                        .Build())
+                    .BindNamedParameter<GroupTestConfig.NumEvaluators, int>(
+                        GenericType<GroupTestConfig.NumEvaluators>.Class,
+                        _numEvaluators.ToString(CultureInfo.InvariantCulture))
+                    .BindNamedParameter<GroupTestConfig.NumIterations, int>(
+                        GenericType<GroupTestConfig.NumIterations>.Class,
+                        _numIterations.ToString(CultureInfo.InvariantCulture))
+                    .BindNamedParameter<GroupTestConfig.ArraySize, int>(
+                        GenericType<GroupTestConfig.ArraySize>.Class,
+                        
GroupTestConstants.ArrayLength.ToString(CultureInfo.InvariantCulture))
+                    .Build();
+
+                _commGroup.AddTask(slaveTaskId);
+                _mpiTaskStarter.QueueTask(partialTaskConf, activeContext);
+            }
+        }
+
+        public void OnNext(IFailedEvaluator value)
+        {
+        }
+
+        public void OnError(Exception error)
+        {
+        }
+
+        public void OnCompleted()
+        {
+        }
+
+        private void CreateClassHierarchy()
+        {
+            HashSet<string> clrDlls = new HashSet<string>();
+            clrDlls.Add(typeof(IDriver).Assembly.GetName().Name);
+            clrDlls.Add(typeof(ITask).Assembly.GetName().Name);
+            
clrDlls.Add(typeof(PipelinedBroadcastReduceDriver).Assembly.GetName().Name);
+            clrDlls.Add(typeof(INameClient).Assembly.GetName().Name);
+            clrDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
+
+            ClrHandlerHelper.GenerateClassHierarchy(clrDlls);
+        }
+
+        private class SumFunction : IReduceFunction<int>
+        {
+            [Inject]
+            public SumFunction()
+            {
+            }
+
+            public int Reduce(IEnumerable<int> elements)
+            {
+                return elements.Sum();
+            }
+        }
+
+        private class ArraySumFunction : IReduceFunction<int[]>
+        {
+            [Inject]
+            public ArraySumFunction()
+            {
+            }
+
+            public int[] Reduce(IEnumerable<int[]> elements)
+            {
+                int[] result = null;
+                int count = 0;
+
+                foreach (var element in elements)
+                {
+                    if (count == 0)
+                    {
+                        result = element.Clone() as int[];
+                    }
+                    else
+                    {
+                        if (element.Length != result.Length)
+                        {
+                            throw new Exception("integer arrays are of 
different sizes");
+                        }
+
+                        for (int i = 0; i < result.Length; i++)
+                        {
+                            result[i] += element[i];
+                        }
+                    }
+
+                    count++;
+                }
+
+                return result;
+            }
+        }
+
+
+        private class IntArrayCodec : ICodec<int[]>
+        {
+            [Inject]
+            public IntArrayCodec()
+            {
+            }
+
+            public byte[] Encode(int[] obj)
+            {
+                byte[] result = new byte[sizeof(Int32) * obj.Length];
+                Buffer.BlockCopy(obj, 0, result, 0, result.Length);
+                return result;
+            }
+
+            public int[] Decode(byte[] data)
+            {
+                if (data.Length % sizeof(Int32) != 0)
+                {
+                    throw new Exception("error inside integer array decoder, 
byte array length not a multiple of interger size");
+                }
+
+                int[] result = new int[data.Length / sizeof(Int32)];
+                Buffer.BlockCopy(data, 0, result, 0, data.Length);
+                return result;
+            }
+        }
+
+        public class PipelineIntDataConverter : IPipelineDataConverter<int[]>
+        {
+            readonly int _chunkSize;
+            
+            [Inject]
+            public 
PipelineIntDataConverter([Parameter(typeof(GroupTestConfig.ChunkSize))] int 
chunkSize)
+            {
+                _chunkSize = chunkSize;
+            }
+
+            public List<PipelineMessage<int[]>> PipelineMessage(int[] message)
+            {
+                List<PipelineMessage<int[]>> messageList = new 
List<PipelineMessage<int[]>>();
+                int totalChunks = message.Length / _chunkSize;
+
+                if (message.Length % _chunkSize != 0)
+                {
+                    totalChunks++;
+                }
+
+                int counter = 0;
+                for (int i = 0; i < message.Length; i += _chunkSize)
+                {
+                    int[] data = new int[Math.Min(_chunkSize, message.Length - 
i)];
+                    Buffer.BlockCopy(message, i * sizeof(int), data, 0, 
data.Length * sizeof(int));
+
+                    messageList.Add(counter == totalChunks - 1
+                        ? new PipelineMessage<int[]>(data, true)
+                        : new PipelineMessage<int[]>(data, false));
+
+                    counter++;
+                }
+
+                return messageList;
+            }
+
+            public int[] FullMessage(List<PipelineMessage<int[]>> 
pipelineMessage)
+            {
+                int size = pipelineMessage.Select(x => x.Data.Length).Sum();
+                int[] data = new int[size];
+                int offset = 0;
+
+                foreach (var message in pipelineMessage)
+                {
+                    Buffer.BlockCopy(message.Data, 0, data, offset, 
message.Data.Length * sizeof(int));
+                    offset += message.Data.Length * sizeof(int);
+                }
+
+                return data;
+            }
+
+            public IConfiguration GetConfiguration()
+            {
+                return TangFactory.GetTang().NewConfigurationBuilder()
+                .BindNamedParameter<GroupTestConfig.ChunkSize, 
int>(GenericType<GroupTestConfig.ChunkSize>.Class, 
_chunkSize.ToString(CultureInfo.InvariantCulture))
+                .Build();
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
new file mode 100644
index 0000000..0e4ccee
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedMasterTask.cs
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Linq;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks
+{
+    public class PipelinedMasterTask : ITask
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(PipelinedMasterTask));
+
+        private readonly int _numIters;
+        private readonly int _numReduceSenders;
+        private readonly int _arraySize;
+
+        private readonly IMpiClient _mpiClient;
+        private readonly ICommunicationGroupClient _commGroup;
+        private readonly IBroadcastSender<int[]> _broadcastSender;
+        private readonly IReduceReceiver<int[]> _sumReducer;
+
+        [Inject]
+        public PipelinedMasterTask(
+            [Parameter(typeof(GroupTestConfig.NumIterations))] int numIters,
+            [Parameter(typeof(GroupTestConfig.NumEvaluators))] int 
numEvaluators,
+            [Parameter(typeof(GroupTestConfig.ArraySize))] int arraySize,
+            IMpiClient mpiClient)
+        {
+            Logger.Log(Level.Info, "Hello from master task");
+            _numIters = numIters;
+            _numReduceSenders = numEvaluators - 1;
+            _arraySize = arraySize;
+            _mpiClient = mpiClient;
+
+            _commGroup = 
mpiClient.GetCommunicationGroup(GroupTestConstants.GroupName);
+            _broadcastSender = 
_commGroup.GetBroadcastSender<int[]>(GroupTestConstants.BroadcastOperatorName);
+            _sumReducer = 
_commGroup.GetReduceReceiver<int[]>(GroupTestConstants.ReduceOperatorName);
+            Logger.Log(Level.Info, "finished master task constructor");
+        }
+
+        public byte[] Call(byte[] memento)
+        {
+            int[] intArr = new int[_arraySize];
+
+            for (int i = 1; i <= _numIters; i++)
+            {
+                for (int j = 0; j < _arraySize; j++)
+                {
+                    intArr[j] = i;
+                }
+
+                _broadcastSender.Send(intArr);
+                int[] sum = _sumReducer.Reduce();
+
+                Logger.Log(Level.Info, "Received sum: {0} on iteration: {1}", 
sum, i);
+
+                int expected = TriangleNumber(i) * _numReduceSenders;
+
+                for (int j = 0; j < intArr.Length; j++)
+                {
+                    if (sum[j] != TriangleNumber(i) * _numReduceSenders)
+                    {
+                        throw new Exception("Expected " + expected + " but got 
" + sum);
+                    }
+                }
+            }
+
+            return null;
+        }
+
+        public void Dispose()
+        {
+            _mpiClient.Dispose();
+        }
+
+        private int TriangleNumber(int n)
+        {
+            return Enumerable.Range(1, n).Sum();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs
new file mode 100644
index 0000000..503e6e3
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/PipelineBroadcastReduceDriverAndTasks/PipelinedSlaveTask.cs
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Linq;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.PipelineBroadcastReduceDriverAndTasks
+{
+    public class PipelinedSlaveTask : ITask
+    {
+        private static readonly Logger Logger = 
Logger.GetLogger(typeof(PipelinedSlaveTask));
+
+        private readonly int _numIterations;
+        private readonly IMpiClient _mpiClient;
+        private readonly ICommunicationGroupClient _commGroup;
+        private readonly IBroadcastReceiver<int[]> _broadcastReceiver;
+        private readonly IReduceSender<int[]> _triangleNumberSender;
+
+        [Inject]
+        public PipelinedSlaveTask(
+            [Parameter(typeof(GroupTestConfig.NumIterations))] int numIters,
+            IMpiClient mpiClient)
+        {
+            Logger.Log(Level.Info, "Hello from slave task");
+
+            _numIterations = numIters;
+            _mpiClient = mpiClient;
+            _commGroup = 
_mpiClient.GetCommunicationGroup(GroupTestConstants.GroupName);
+            _broadcastReceiver = 
_commGroup.GetBroadcastReceiver<int[]>(GroupTestConstants.BroadcastOperatorName);
+            _triangleNumberSender = 
_commGroup.GetReduceSender<int[]>(GroupTestConstants.ReduceOperatorName);
+        }
+
+        public byte[] Call(byte[] memento)
+        {
+            for (int i = 0; i < _numIterations; i++)
+            {
+                // Receive n from Master Task
+                int[] intVec = _broadcastReceiver.Receive();
+
+                Logger.Log(Level.Info, "Calculating TriangleNumber({0}) on 
slave task...", intVec[0]);
+
+                // Calculate the nth Triangle number and send it back to driver
+                int triangleNum = TriangleNumber(intVec[0]);
+                Logger.Log(Level.Info, "Sending sum: {0} on iteration {1}.", 
triangleNum, i);
+
+                int[] resArr = new int[intVec.Length];
+
+                for (int j = 0; j < resArr.Length; j++)
+                {
+                    resArr[j] = triangleNum;
+                }
+
+                _triangleNumberSender.Send(resArr);
+            }
+
+            return null;
+        }
+
+        public void Dispose()
+        {
+            _mpiClient.Dispose();
+        }
+
+        private int TriangleNumber(int n)
+        {
+            return Enumerable.Range(1, n).Sum();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs
new file mode 100644
index 0000000..a3d17d7
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/MasterTask.cs
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Task;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDriverAndTasks
+{
+    public class MasterTask : ITask
+    {
+        private static readonly Logger _logger = 
Logger.GetLogger(typeof(MasterTask));
+
+        private readonly IMpiClient _mpiClient;
+        private readonly ICommunicationGroupClient _commGroup;
+        private readonly IScatterSender<int> _scatterSender;
+        private readonly IReduceReceiver<int> _sumReducer;
+
+        [Inject]
+        public MasterTask(IMpiClient mpiClient)
+        {
+            _logger.Log(Level.Info, "Hello from master task");
+            _mpiClient = mpiClient;
+
+            _commGroup = 
mpiClient.GetCommunicationGroup(GroupTestConstants.GroupName);
+            _scatterSender = 
_commGroup.GetScatterSender<int>(GroupTestConstants.ScatterOperatorName);
+            _sumReducer = 
_commGroup.GetReduceReceiver<int>(GroupTestConstants.ReduceOperatorName);
+        }
+
+        public byte[] Call(byte[] memento)
+        {
+            List<int> data = Enumerable.Range(1, 100).ToList();
+            _scatterSender.Send(data);
+
+            int sum = _sumReducer.Reduce();
+            _logger.Log(Level.Info, "Received sum: {0}", sum);
+
+            return null;
+        }
+
+        public void Dispose()
+        {
+            _mpiClient.Dispose();
+        }
+
+        private List<string> GetScatterOrder()
+        {
+            return new List<string> { "SlaveTask-4", "SlaveTask-3", 
"SlaveTask-2", "SlaveTask-1" };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/c85d45a2/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
new file mode 100644
index 0000000..fab12e5
--- /dev/null
+++ 
b/lang/cs/Org.Apache.REEF.Network.Examples/GroupCommunication/ScatterReduceDriverAndTasks/ScatterReduceDriver.cs
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Common.Io;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Bridge;
+using Org.Apache.REEF.Driver.Context;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Network.Group.Driver;
+using Org.Apache.REEF.Network.Group.Driver.Impl;
+using Org.Apache.REEF.Network.Group.Operators;
+using Org.Apache.REEF.Network.Group.Topology;
+using Org.Apache.REEF.Network.NetworkService;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote.Impl;
+
+namespace 
Org.Apache.REEF.Network.Examples.GroupCommunication.ScatterReduceDriverAndTasks
+{
+    public class ScatterReduceDriver : IStartHandler, 
IObserver<IEvaluatorRequestor>, IObserver<IAllocatedEvaluator>, 
IObserver<IActiveContext>, IObserver<IFailedEvaluator>
+    {
+        private static readonly Logger LOGGER = 
Logger.GetLogger(typeof(ScatterReduceDriver));
+
+        private readonly int _numEvaluators;
+
+        private readonly IMpiDriver _mpiDriver;
+        private readonly ICommunicationGroupDriver _commGroup;
+        private readonly TaskStarter _mpiTaskStarter;
+
+        [Inject]
+        public ScatterReduceDriver(
+            [Parameter(typeof(GroupTestConfig.NumEvaluators))] int 
numEvaluators,
+            MpiDriver mpiDriver)
+        {
+            Identifier = "BroadcastStartHandler";
+            _numEvaluators = numEvaluators;
+            _mpiDriver = mpiDriver; 
+            _commGroup = _mpiDriver.DefaultGroup
+                    .AddScatter<int, IntCodec>(
+                        GroupTestConstants.ScatterOperatorName,
+                            GroupTestConstants.MasterTaskId,
+                            TopologyTypes.Tree)
+                    .AddReduce<int, IntCodec>(
+                        GroupTestConstants.ReduceOperatorName,
+                            GroupTestConstants.MasterTaskId,
+                            new SumFunction())
+                    .Build();
+
+            _mpiTaskStarter = new TaskStarter(_mpiDriver, numEvaluators);
+
+            CreateClassHierarchy();
+        }
+
+        public string Identifier { get; set; }
+
+        public void OnNext(IEvaluatorRequestor evaluatorRequestor)
+        {
+            EvaluatorRequest request = new EvaluatorRequest(_numEvaluators, 
512, 2, "WonderlandRack", "BroadcastEvaluator");
+            evaluatorRequestor.Submit(request);
+        }
+
+        public void OnNext(IAllocatedEvaluator allocatedEvaluator)
+        {
+            IConfiguration contextConf = _mpiDriver.GetContextConfiguration();
+            IConfiguration serviceConf = _mpiDriver.GetServiceConfiguration();
+            allocatedEvaluator.SubmitContextAndService(contextConf, 
serviceConf);
+        }
+
+        public void OnNext(IActiveContext activeContext)
+        {
+            if (_mpiDriver.IsMasterTaskContext(activeContext))
+            {
+                // Configure Master Task
+                IConfiguration partialTaskConf = 
TaskConfiguration.ConfigurationModule
+                    .Set(TaskConfiguration.Identifier, 
GroupTestConstants.MasterTaskId)
+                    .Set(TaskConfiguration.Task, GenericType<MasterTask>.Class)
+                    .Build();
+
+                _commGroup.AddTask(GroupTestConstants.MasterTaskId);
+                _mpiTaskStarter.QueueTask(partialTaskConf, activeContext);
+            }
+            else
+            {
+                // Configure Slave Task
+                string slaveTaskId = GroupTestConstants.SlaveTaskId +
+                    _mpiDriver.GetContextNum(activeContext);
+
+                IConfiguration partialTaskConf = 
TaskConfiguration.ConfigurationModule
+                    .Set(TaskConfiguration.Identifier, slaveTaskId)
+                    .Set(TaskConfiguration.Task, GenericType<SlaveTask>.Class)
+                    .Build();
+
+                _commGroup.AddTask(slaveTaskId);
+                _mpiTaskStarter.QueueTask(partialTaskConf, activeContext);
+            }
+        }
+
+        public void OnNext(IFailedEvaluator value)
+        {
+        }
+
+        public void OnError(Exception error)
+        {
+        }
+
+        public void OnCompleted()
+        {
+        }
+
+        private void CreateClassHierarchy()
+        {
+            HashSet<string> clrDlls = new HashSet<string>();
+            clrDlls.Add(typeof(IDriver).Assembly.GetName().Name);
+            clrDlls.Add(typeof(ITask).Assembly.GetName().Name);
+            clrDlls.Add(typeof(ScatterReduceDriver).Assembly.GetName().Name);
+            clrDlls.Add(typeof(INameClient).Assembly.GetName().Name);
+            clrDlls.Add(typeof(INetworkService<>).Assembly.GetName().Name);
+
+            ClrHandlerHelper.GenerateClassHierarchy(clrDlls);
+        }
+
+        private class SumFunction : IReduceFunction<int>
+        {
+            [Inject]
+            public SumFunction()
+            {
+            }
+
+            public int Reduce(IEnumerable<int> elements)
+            {
+                return elements.Sum();
+            }
+        }
+    }
+}

Reply via email to