Repository: incubator-reef
Updated Branches:
  refs/heads/master 2516ba49b -> ad431b097


[REEF-654] Flat topology does not provide configuration for gather & scatter 
operation

This addressed the issue by
  * Adding Gather and Scatter configuration for FlatTopology

JIRA:
  [REEF-654](https://issues.apache.org/jira/browse/REEF-654)

Pull Request:
  Closes #443

Author
  Gyeongin Yu [email protected]


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/ad431b09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/ad431b09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/ad431b09

Branch: refs/heads/master
Commit: ad431b09799ac6d5c975d6c07bfbf54bf08d2652
Parents: 2516ba4
Author: Gyeongin Yu <[email protected]>
Authored: Mon Aug 31 13:06:43 2015 +0900
Committer: Byung-Gon Chun <[email protected]>
Committed: Tue Sep 1 07:44:44 2015 +0900

----------------------------------------------------------------------
 .../network/group/impl/driver/FlatTopology.java | 24 +++++++++++++++-----
 1 file changed, 18 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ad431b09/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java
index a2da654..5b69b26 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java
@@ -27,14 +27,13 @@ import 
org.apache.reef.io.network.group.impl.GroupChangesCodec;
 import org.apache.reef.io.network.group.impl.GroupChangesImpl;
 import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
 import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec;
+import org.apache.reef.io.network.group.impl.config.GatherOperatorSpec;
 import org.apache.reef.io.network.group.impl.config.ReduceOperatorSpec;
+import org.apache.reef.io.network.group.impl.config.ScatterOperatorSpec;
 import org.apache.reef.io.network.group.impl.config.parameters.DataCodec;
 import 
org.apache.reef.io.network.group.impl.config.parameters.ReduceFunctionParam;
 import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion;
-import org.apache.reef.io.network.group.impl.operators.BroadcastReceiver;
-import org.apache.reef.io.network.group.impl.operators.BroadcastSender;
-import org.apache.reef.io.network.group.impl.operators.ReduceReceiver;
-import org.apache.reef.io.network.group.impl.operators.ReduceSender;
+import org.apache.reef.io.network.group.impl.operators.*;
 import org.apache.reef.io.network.group.impl.utils.Utils;
 import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
 import org.apache.reef.io.serialization.Codec;
@@ -119,8 +118,7 @@ public class FlatTopology implements Topology {
       } else {
         jcb.bindImplementation(GroupCommOperator.class, 
BroadcastReceiver.class);
       }
-    }
-    if (operatorSpec instanceof ReduceOperatorSpec) {
+    } else if (operatorSpec instanceof ReduceOperatorSpec) {
       final ReduceOperatorSpec reduceOperatorSpec = (ReduceOperatorSpec) 
operatorSpec;
       jcb.bindNamedParameter(ReduceFunctionParam.class, 
reduceOperatorSpec.getRedFuncClass());
       if (taskId.equals(reduceOperatorSpec.getReceiverId())) {
@@ -128,6 +126,20 @@ public class FlatTopology implements Topology {
       } else {
         jcb.bindImplementation(GroupCommOperator.class, ReduceSender.class);
       }
+    } else if (operatorSpec instanceof ScatterOperatorSpec) {
+      final ScatterOperatorSpec scatterOperatorSpec = (ScatterOperatorSpec) 
operatorSpec;
+      if (taskId.equals(scatterOperatorSpec.getSenderId())) {
+        jcb.bindImplementation(GroupCommOperator.class, ScatterSender.class);
+      } else {
+        jcb.bindImplementation(GroupCommOperator.class, ScatterReceiver.class);
+      }
+    } else if (operatorSpec instanceof GatherOperatorSpec) {
+      final GatherOperatorSpec gatherOperatorSpec = (GatherOperatorSpec) 
operatorSpec;
+      if (taskId.equals(gatherOperatorSpec.getReceiverId())) {
+        jcb.bindImplementation(GroupCommOperator.class, GatherReceiver.class);
+      } else {
+        jcb.bindImplementation(GroupCommOperator.class, GatherSender.class);
+      }
     }
     return jcb.build();
   }

Reply via email to