Adding simple graph computer test of proper message passing in all 3 directions
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/80c0b846 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/80c0b846 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/80c0b846 Branch: refs/heads/master Commit: 80c0b8465fe9bb6ddbe4522a36304c7ba054e909 Parents: f02ea33 Author: Graff, Philip B <philip.gr...@jhuapl.edu> Authored: Sat Jan 13 21:32:29 2018 -0500 Committer: Graff, Philip B <philip.gr...@jhuapl.edu> Committed: Sun Feb 25 10:44:15 2018 -0500 ---------------------------------------------------------------------- .../process/computer/GraphComputerTest.java | 175 +++++++++++++++++++ 1 file changed, 175 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/80c0b846/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java ---------------------------------------------------------------------- diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java index 8c846d5..c34c2dc 100644 --- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java +++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java @@ -34,6 +34,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Operator; import org.apache.tinkerpop.gremlin.process.traversal.P; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__; import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyPath; import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException; @@ -2683,4 +2684,178 @@ public class GraphComputerTest extends AbstractGremlinProcessTest { }; } } + + /////////////////////////////////// + + @Test + public void testMessagePassingIn() throws Exception { + runTest(Direction.IN).forEachRemaining(v -> { + String in = v.value("propin").toString(); + if (in.equals("a")) { + assertEquals("ab", v.value("propout").toString()); + } else { + assertEquals("", v.value("propout").toString()); + } + }); + } + + @Test + public void testMessagePassingOut() throws Exception { + runTest(Direction.OUT).forEachRemaining(v -> { + String in = v.value("propin").toString(); + if (in.equals("a")) { + assertEquals("a", v.value("propout").toString()); + } else { + assertEquals("a", v.value("propout").toString()); + } + }); + } + + @Test + public void testMessagePassingBoth() throws Exception { + runTest(Direction.BOTH).forEachRemaining(v -> { + String in = v.value("propin").toString(); + if (in.equals("a")) { + assertEquals("aab", v.value("propout").toString()); + } else { + assertEquals("a", v.value("propout").toString()); + } + }); + } + + private GraphTraversal<Vertex, Vertex> runTest(Direction direction) throws Exception { + g.addV().property("propin", "a").as("a") + .addV().property("propin", "b").as("b") + .addE("edge").from("a").to("b").addE("edge").from("a").to("a").iterate(); + final VertexProgramR svp = VertexProgramR.build().propertyIn("propin") + .propertyOut("propout").direction(direction).create(); + final ComputerResult result = graphProvider.getGraphComputer(graph).program(svp).submit().get(); + return result.graph().traversal().V(); + } + + private static class VertexProgramR implements VertexProgram<String> { + private static final String SIMPLE_VERTEX_PROGRAM_CFG_PREFIX = "gremlin.simpleVertexProgram"; + private static final String PROPERTY_OUT_CFG_KEY = SIMPLE_VERTEX_PROGRAM_CFG_PREFIX + ".propertyout"; + private static final String PROPERTY_IN_CFG_KEY = SIMPLE_VERTEX_PROGRAM_CFG_PREFIX + ".propertyin"; + private static final String DIRECTION_CFG_KEY = SIMPLE_VERTEX_PROGRAM_CFG_PREFIX + ".direction"; + + private final MessageScope.Local<String> inMessageScope = MessageScope.Local.of(__::inE); + private final MessageScope.Local<String> outMessageScope = MessageScope.Local.of(__::outE); + private final MessageScope.Local<String> bothMessageScope = MessageScope.Local.of(__::bothE); + private MessageScope.Local<String> messageScope; + private Set<VertexComputeKey> vertexComputeKeys; + + private String propertyout, propertyin; + + /** + * Clones this vertex program. + * + * @return a clone of this vertex program + */ + public VertexProgramR clone() { return this; } + + @Override + public void loadState(final Graph graph, final Configuration configuration) { + this.propertyout = configuration.getString(PROPERTY_OUT_CFG_KEY); + this.propertyin = configuration.getString(PROPERTY_IN_CFG_KEY); + Direction direction = Direction.valueOf(configuration.getString(DIRECTION_CFG_KEY)); + switch (direction) { + case IN: + this.messageScope = this.inMessageScope; + break; + case OUT: + this.messageScope = this.outMessageScope; + break; + default: + this.messageScope = this.bothMessageScope; + break; + } + this.vertexComputeKeys = new HashSet<>(Arrays.asList(VertexComputeKey.of(this.propertyout, false), + VertexComputeKey.of(this.propertyin, false))); + } + + @Override + public void setup(Memory memory) { + } + + @Override + public void execute(Vertex vertex, Messenger<String> messenger, Memory memory) { + if (memory.isInitialIteration()) { + messenger.sendMessage(this.messageScope, vertex.value(this.propertyin).toString()); + } else { + char[] composite = IteratorUtils.reduce(messenger.receiveMessages(), "", (a, b) -> a + b).toCharArray(); + Arrays.sort(composite); + vertex.property(this.propertyout, new String(composite)); + } + } + + @Override + public boolean terminate(Memory memory) { + return !memory.isInitialIteration(); + } + + @Override + public Set<MessageScope> getMessageScopes(Memory memory) { + return Collections.singleton(this.messageScope); + } + + @Override + public GraphComputer.ResultGraph getPreferredResultGraph() { + return GraphComputer.ResultGraph.NEW; + } + + @Override + public GraphComputer.Persist getPreferredPersist() { + return GraphComputer.Persist.VERTEX_PROPERTIES; + } + + @Override + public Set<VertexComputeKey> getVertexComputeKeys() { + return this.vertexComputeKeys; + } + + @Override + public Set<MemoryComputeKey> getMemoryComputeKeys() { + return Collections.emptySet(); + } + + public static Builder build() { + return new Builder(); + } + + static class Builder extends AbstractVertexProgramBuilder<Builder> { + + private Builder() { + super(VertexProgramR.class); + } + + @SuppressWarnings("unchecked") + @Override + public VertexProgramR create(final Graph graph) { + if (graph != null) { + ConfigurationUtils.append(graph.configuration().subset(SIMPLE_VERTEX_PROGRAM_CFG_PREFIX), configuration); + } + return (VertexProgramR) VertexProgram.createVertexProgram(graph, configuration); + } + + public VertexProgramR create() { + return create(null); + } + + public Builder propertyOut(final String name) { + configuration.setProperty(PROPERTY_OUT_CFG_KEY, name); + return this; + } + + public Builder propertyIn(final String name) { + configuration.setProperty(PROPERTY_IN_CFG_KEY, name); + return this; + } + + public Builder direction(final Direction direction) { + configuration.setProperty(DIRECTION_CFG_KEY, direction.toString()); + return this; + } + } + } }