Repository: incubator-reef Updated Branches: refs/heads/master 2516ba49b -> ad431b097
[REEF-654] Flat topology does not provide configuration for gather & scatter operation This addressed the issue by * Adding Gather and Scatter configuration for FlatTopology JIRA: [REEF-654](https://issues.apache.org/jira/browse/REEF-654) Pull Request: Closes #443 Author Gyeongin Yu [email protected] Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/ad431b09 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/ad431b09 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/ad431b09 Branch: refs/heads/master Commit: ad431b09799ac6d5c975d6c07bfbf54bf08d2652 Parents: 2516ba4 Author: Gyeongin Yu <[email protected]> Authored: Mon Aug 31 13:06:43 2015 +0900 Committer: Byung-Gon Chun <[email protected]> Committed: Tue Sep 1 07:44:44 2015 +0900 ---------------------------------------------------------------------- .../network/group/impl/driver/FlatTopology.java | 24 +++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/ad431b09/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java index a2da654..5b69b26 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/driver/FlatTopology.java @@ -27,14 +27,13 @@ import org.apache.reef.io.network.group.impl.GroupChangesCodec; import org.apache.reef.io.network.group.impl.GroupChangesImpl; import org.apache.reef.io.network.group.impl.GroupCommunicationMessage; import org.apache.reef.io.network.group.impl.config.BroadcastOperatorSpec; +import org.apache.reef.io.network.group.impl.config.GatherOperatorSpec; import org.apache.reef.io.network.group.impl.config.ReduceOperatorSpec; +import org.apache.reef.io.network.group.impl.config.ScatterOperatorSpec; import org.apache.reef.io.network.group.impl.config.parameters.DataCodec; import org.apache.reef.io.network.group.impl.config.parameters.ReduceFunctionParam; import org.apache.reef.io.network.group.impl.config.parameters.TaskVersion; -import org.apache.reef.io.network.group.impl.operators.BroadcastReceiver; -import org.apache.reef.io.network.group.impl.operators.BroadcastSender; -import org.apache.reef.io.network.group.impl.operators.ReduceReceiver; -import org.apache.reef.io.network.group.impl.operators.ReduceSender; +import org.apache.reef.io.network.group.impl.operators.*; import org.apache.reef.io.network.group.impl.utils.Utils; import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos; import org.apache.reef.io.serialization.Codec; @@ -119,8 +118,7 @@ public class FlatTopology implements Topology { } else { jcb.bindImplementation(GroupCommOperator.class, BroadcastReceiver.class); } - } - if (operatorSpec instanceof ReduceOperatorSpec) { + } else if (operatorSpec instanceof ReduceOperatorSpec) { final ReduceOperatorSpec reduceOperatorSpec = (ReduceOperatorSpec) operatorSpec; jcb.bindNamedParameter(ReduceFunctionParam.class, reduceOperatorSpec.getRedFuncClass()); if (taskId.equals(reduceOperatorSpec.getReceiverId())) { @@ -128,6 +126,20 @@ public class FlatTopology implements Topology { } else { jcb.bindImplementation(GroupCommOperator.class, ReduceSender.class); } + } else if (operatorSpec instanceof ScatterOperatorSpec) { + final ScatterOperatorSpec scatterOperatorSpec = (ScatterOperatorSpec) operatorSpec; + if (taskId.equals(scatterOperatorSpec.getSenderId())) { + jcb.bindImplementation(GroupCommOperator.class, ScatterSender.class); + } else { + jcb.bindImplementation(GroupCommOperator.class, ScatterReceiver.class); + } + } else if (operatorSpec instanceof GatherOperatorSpec) { + final GatherOperatorSpec gatherOperatorSpec = (GatherOperatorSpec) operatorSpec; + if (taskId.equals(gatherOperatorSpec.getReceiverId())) { + jcb.bindImplementation(GroupCommOperator.class, GatherReceiver.class); + } else { + jcb.bindImplementation(GroupCommOperator.class, GatherSender.class); + } } return jcb.build(); }
