Repository: tinkerpop Updated Branches: refs/heads/TINKERPOP-1564 53bab282e -> 23c48cf2f
okay, all direct references to akka URIs are removed from gremlin-core. I have one more thing to do with Message priorities. After that, clean, Javadoc, etc. Going to take a break for a bit first. Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/23c48cf2 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/23c48cf2 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/23c48cf2 Branch: refs/heads/TINKERPOP-1564 Commit: 23c48cf2ff39538f87c81a9c6ac4283f9609a506 Parents: 53bab28 Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Tue Dec 13 10:18:12 2016 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Tue Dec 13 10:18:12 2016 -0700 ---------------------------------------------------------------------- .../gremlin/akka/process/actor/AkkaActors.java | 13 ++- .../gremlin/akka/process/actor/MasterActor.java | 84 ++++++++++++++++++ .../process/actor/MasterTraversalActor.java | 84 ------------------ .../gremlin/akka/process/actor/WorkerActor.java | 91 ++++++++++++++++++++ .../process/actor/WorkerTraversalActor.java | 91 -------------------- .../tinkerpop/gremlin/process/actor/Actors.java | 6 +- .../gremlin/process/actor/Address.java | 11 ++- .../actor/traversal/TraversalActorProgram.java | 16 ++-- .../actor/traversal/TraversalMasterProgram.java | 19 ++-- .../actor/traversal/TraversalWorkerProgram.java | 32 +++---- .../actor/traversal/step/map/ActorStep.java | 3 +- 11 files changed, 222 insertions(+), 228 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/23c48cf2/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java index db024f6..de301c1 100644 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java @@ -24,7 +24,6 @@ import akka.actor.Props; import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; import org.apache.tinkerpop.gremlin.process.actor.Actors; import org.apache.tinkerpop.gremlin.process.actor.Address; -import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; import org.apache.tinkerpop.gremlin.structure.Partitioner; import java.util.concurrent.CompletableFuture; @@ -33,16 +32,16 @@ import java.util.concurrent.Future; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class AkkaActors<S, E> implements Actors<S, E> { +public final class AkkaActors<R> implements Actors<R> { - private final ActorProgram actorProgram; + private final ActorProgram<R> actorProgram; private final ActorSystem system; private final Address.Master master; - public AkkaActors(final ActorProgram actorProgram, final Partitioner partitioner) { + public AkkaActors(final ActorProgram<R> actorProgram, final Partitioner partitioner) { this.actorProgram = actorProgram; this.system = ActorSystem.create("traversal-" + actorProgram.hashCode()); - this.master = new Address.Master(this.system.actorOf(Props.create(MasterTraversalActor.class, this.actorProgram, partitioner), "master").path().toString()); + this.master = new Address.Master(this.system.actorOf(Props.create(MasterActor.class, this.actorProgram, partitioner), "master").path().toString()); } @Override @@ -51,12 +50,12 @@ public final class AkkaActors<S, E> implements Actors<S, E> { } @Override - public Future<TraverserSet<E>> submit() { + public Future<R> submit() { return CompletableFuture.supplyAsync(() -> { while (!this.system.isTerminated()) { } - return (TraverserSet) this.actorProgram.getResult(); + return this.actorProgram.getResult(); }); } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/23c48cf2/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java new file mode 100644 index 0000000..d7b45fa --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actor; + +import akka.actor.AbstractActor; +import akka.actor.ActorSelection; +import akka.actor.Props; +import akka.dispatch.RequiresMessageQueue; +import akka.japi.pf.ReceiveBuilder; +import org.apache.tinkerpop.gremlin.process.actor.Actor; +import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; +import org.apache.tinkerpop.gremlin.process.actor.Address; +import org.apache.tinkerpop.gremlin.structure.Partition; +import org.apache.tinkerpop.gremlin.structure.Partitioner; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class MasterActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, Actor.Master { + + private final Address.Master master; + private final List<Address.Worker> workers; + private final Map<Address, ActorSelection> actors = new HashMap<>(); + + public MasterActor(final ActorProgram program, final Partitioner partitioner) { + this.master = new Address.Master(self().path().toString()); + this.workers = new ArrayList<>(); + final List<Partition> partitions = partitioner.getPartitions(); + for (final Partition partition : partitions) { + this.workers.add(new Address.Worker("worker-" + partition.hashCode())); + context().actorOf(Props.create(WorkerActor.class, program, partitioner, partition), "worker-" + partition.hashCode()); + } + final ActorProgram.Master masterProgram = program.createMasterProgram(this); + receive(ReceiveBuilder.matchAny(masterProgram::execute).build()); + masterProgram.setup(); + } + + @Override + public <M> void send(final Address toActor, final M message) { + ActorSelection actor = this.actors.get(toActor); + if (null == actor) { + actor = context().actorSelection(toActor.location()); + this.actors.put(toActor, actor); + } + actor.tell(message, self()); + } + + @Override + public List<Address.Worker> workers() { + return this.workers; + } + + @Override + public Address.Master address() { + return this.master; + } + + @Override + public void close() { + context().system().terminate(); + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/23c48cf2/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java deleted file mode 100644 index 6799a28..0000000 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.akka.process.actor; - -import akka.actor.AbstractActor; -import akka.actor.ActorSelection; -import akka.actor.Props; -import akka.dispatch.RequiresMessageQueue; -import akka.japi.pf.ReceiveBuilder; -import org.apache.tinkerpop.gremlin.process.actor.Actor; -import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; -import org.apache.tinkerpop.gremlin.process.actor.Address; -import org.apache.tinkerpop.gremlin.structure.Partition; -import org.apache.tinkerpop.gremlin.structure.Partitioner; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class MasterTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, Actor.Master { - - private final Address.Master master; - private final List<Address.Worker> workers; - private final Map<Address, ActorSelection> actors = new HashMap<>(); - - public MasterTraversalActor(final ActorProgram program, final Partitioner partitioner) { - this.master = new Address.Master(self().path().toString()); - this.workers = new ArrayList<>(); - final List<Partition> partitions = partitioner.getPartitions(); - for (final Partition partition : partitions) { - this.workers.add(new Address.Worker("worker-" + partition.hashCode())); - context().actorOf(Props.create(WorkerTraversalActor.class, program, partitioner, partition), "worker-" + partition.hashCode()); - } - final ActorProgram.Master masterProgram = program.createMasterProgram(this); - receive(ReceiveBuilder.matchAny(masterProgram::execute).build()); - masterProgram.setup(); - } - - @Override - public <M> void send(final Address toActor, final M message) { - ActorSelection actor = this.actors.get(toActor); - if (null == actor) { - actor = context().actorSelection(toActor.location()); - this.actors.put(toActor, actor); - } - actor.tell(message, self()); - } - - @Override - public List<Address.Worker> workers() { - return this.workers; - } - - @Override - public Address.Master address() { - return this.master; - } - - @Override - public void close() { - context().system().terminate(); - } -} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/23c48cf2/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java new file mode 100644 index 0000000..84dbe37 --- /dev/null +++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.akka.process.actor; + +import akka.actor.AbstractActor; +import akka.actor.ActorSelection; +import akka.dispatch.RequiresMessageQueue; +import akka.japi.pf.ReceiveBuilder; +import org.apache.tinkerpop.gremlin.process.actor.Actor; +import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; +import org.apache.tinkerpop.gremlin.process.actor.Address; +import org.apache.tinkerpop.gremlin.structure.Partition; +import org.apache.tinkerpop.gremlin.structure.Partitioner; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class WorkerActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, Actor.Worker { + + private final Partition localPartition; + private final Address.Worker self; + private final Address.Master master; + private final List<Address.Worker> workers; + private final Map<Address, ActorSelection> actors = new HashMap<>(); + + public WorkerActor(final ActorProgram program, final Partitioner partitioner, final Partition localPartition) { + this.localPartition = localPartition; + this.self = new Address.Worker("../worker-" + localPartition.hashCode()); + this.master = new Address.Master(context().parent().path().toString()); + this.workers = new ArrayList<>(); + for (final Partition partition : partitioner.getPartitions()) { + this.workers.add(new Address.Worker("../worker-" + partition.hashCode())); + } + ActorProgram.Worker workerProgram = program.createWorkerProgram(this); + receive(ReceiveBuilder.matchAny(workerProgram::execute).build()); + workerProgram.setup(); + } + + @Override + public <M> void send(final Address toActor, final M message) { + ActorSelection actor = this.actors.get(toActor); + if (null == actor) { + actor = context().actorSelection(toActor.location()); + this.actors.put(toActor, actor); + } + actor.tell(message, self()); + } + + @Override + public List<Address.Worker> workers() { + return this.workers; + } + + @Override + public Partition partition() { + return this.localPartition; + } + + @Override + public Address.Worker address() { + return this.self; + } + + @Override + public Address.Master master() { + return this.master; + } +} + http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/23c48cf2/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java ---------------------------------------------------------------------- diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java deleted file mode 100644 index 5a6bae7..0000000 --- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.tinkerpop.gremlin.akka.process.actor; - -import akka.actor.AbstractActor; -import akka.actor.ActorSelection; -import akka.dispatch.RequiresMessageQueue; -import akka.japi.pf.ReceiveBuilder; -import org.apache.tinkerpop.gremlin.process.actor.Actor; -import org.apache.tinkerpop.gremlin.process.actor.ActorProgram; -import org.apache.tinkerpop.gremlin.process.actor.Address; -import org.apache.tinkerpop.gremlin.structure.Partition; -import org.apache.tinkerpop.gremlin.structure.Partitioner; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * @author Marko A. Rodriguez (http://markorodriguez.com) - */ -public final class WorkerTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, Actor.Worker { - - private final Partition localPartition; - private final Address.Worker self; - private final Address.Master master; - private final List<Address.Worker> workers; - private final Map<Address, ActorSelection> actors = new HashMap<>(); - - public WorkerTraversalActor(final ActorProgram program, final Partitioner partitioner, final Partition localPartition) { - this.localPartition = localPartition; - this.self = new Address.Worker(self().path().toString()); - this.master = new Address.Master(context().parent().path().toString()); - this.workers = new ArrayList<>(); - for (final Partition partition : partitioner.getPartitions()) { - this.workers.add(new Address.Worker("../worker-" + partition.hashCode())); - } - ActorProgram.Worker workerProgram = program.createWorkerProgram(this); - receive(ReceiveBuilder.matchAny(workerProgram::execute).build()); - workerProgram.setup(); - } - - @Override - public <M> void send(final Address toActor, final M message) { - ActorSelection actor = this.actors.get(toActor); - if (null == actor) { - actor = context().actorSelection(toActor.location()); - this.actors.put(toActor, actor); - } - actor.tell(message, self()); - } - - @Override - public List<Address.Worker> workers() { - return this.workers; - } - - @Override - public Partition partition() { - return this.localPartition; - } - - @Override - public Address.Worker address() { - return this.self; - } - - @Override - public Address.Master master() { - return this.master; - } -} - http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/23c48cf2/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java index 2e410ec..7b0c4a4 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java @@ -19,16 +19,14 @@ package org.apache.tinkerpop.gremlin.process.actor; -import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; - import java.util.concurrent.Future; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public interface Actors<S, E> { +public interface Actors<R> { public Address.Master master(); - public Future<TraverserSet<E>> submit(); + public Future<R> submit(); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/23c48cf2/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java index c598eb7..ff45e30 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java @@ -36,15 +36,22 @@ public abstract class Address implements Serializable { return this.location; } + @Override public boolean equals(final Object other) { return other instanceof Address && ((Address) other).location.equals(this.location); } + @Override public int hashCode() { return this.location.hashCode(); } - public static class Master extends Address { + @Override + public String toString() { + return this.location(); + } + + public static final class Master extends Address { public Master(final String location) { super(location); @@ -52,7 +59,7 @@ public abstract class Address implements Serializable { } - public static class Worker extends Address { + public static final class Worker extends Address { public Worker(final String location) { super(location); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/23c48cf2/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java index 278fb3b..e72b989 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java @@ -33,24 +33,24 @@ import org.apache.tinkerpop.gremlin.structure.Partitioner; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class TraversalActorProgram<M> implements ActorProgram<M> { +public final class TraversalActorProgram<R> implements ActorProgram<TraverserSet<R>> { - private final Traversal.Admin<?, ?> traversal; + private final Traversal.Admin<?, R> traversal; private final Partitioner partitioner; - public TraverserSet<?> result = new TraverserSet<>(); + public TraverserSet<R> result = new TraverserSet<>(); - public TraversalActorProgram(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) { + public TraversalActorProgram(final Traversal.Admin<?, R> traversal, final Partitioner partitioner) { this.partitioner = partitioner; final TraversalStrategies strategies = traversal.getStrategies().clone(); strategies.removeStrategies(ComputerVerificationStrategy.class, StandardVerificationStrategy.class); strategies.addStrategies(ActorVerificationStrategy.instance()); traversal.setStrategies(strategies); traversal.applyStrategies(); - this.traversal = ((TraversalVertexProgramStep) traversal.getStartStep()).computerTraversal.get(); + this.traversal = (Traversal.Admin) ((TraversalVertexProgramStep) traversal.getStartStep()).computerTraversal.get(); } @Override - public Worker<M> createWorkerProgram(final Actor.Worker worker) { + public Worker createWorkerProgram(final Actor.Worker worker) { return new TraversalWorkerProgram<>(worker, this.traversal.clone(), this.partitioner); } @@ -60,7 +60,7 @@ public final class TraversalActorProgram<M> implements ActorProgram<M> { } @Override - public M getResult() { - return (M) this.result; + public TraverserSet<R> getResult() { + return this.result; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/23c48cf2/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java index 654969b..ba051e2 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java @@ -48,32 +48,27 @@ import java.util.Map; public class TraversalMasterProgram<M> implements ActorProgram.Master<M> { private final Actor.Master master; - private final Map<String, Address.Worker> workers = new HashMap<>(); private final Traversal.Admin<?, ?> traversal; private final TraversalMatrix<?, ?> matrix; private final Partitioner partitioner; private Map<String, Barrier> barriers = new HashMap<>(); private final TraverserSet<?> results; - private final String leaderWorker; + private Address.Worker leaderWorker; public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal, final Partitioner partitioner, final TraverserSet<?> results) { this.traversal = traversal; - System.out.println("master[created]: " + master.address().location()); - System.out.println(this.traversal); + //System.out.println("master[created]: " + master.address().location()); + //System.out.println(this.traversal); this.matrix = new TraversalMatrix<>(this.traversal); this.partitioner = partitioner; this.results = results; this.master = master; - this.leaderWorker = "worker-" + this.partitioner.getPartitions().get(0).hashCode(); } @Override public void setup() { - for (final Address.Worker worker : master.workers()) { - this.workers.put(worker.location(), worker); - } + this.leaderWorker = this.master.workers().get(0); this.broadcast(StartMessage.instance()); - } @Override @@ -105,7 +100,7 @@ public class TraversalMasterProgram<M> implements ActorProgram.Master<M> { } } this.barriers.clear(); - this.master.send(this.workers.get(this.leaderWorker), StartMessage.instance()); + this.master.send(this.leaderWorker, StartMessage.instance()); } else { while (this.traversal.hasNext()) { this.results.add((Traverser.Admin) this.traversal.nextTraverser()); @@ -123,7 +118,7 @@ public class TraversalMasterProgram<M> implements ActorProgram.Master<M> { } private void broadcast(final Object message) { - for (final Address.Worker worker : this.workers.values()) { + for (final Address.Worker worker : this.master.workers()) { this.master.send(worker, message); } } @@ -145,7 +140,7 @@ public class TraversalMasterProgram<M> implements ActorProgram.Master<M> { if (traverser.isHalted()) this.results.add(traverser); else if (traverser.get() instanceof Element) - this.master.send(this.workers.get("worker-" + this.partitioner.getPartition((Element) traverser.get()).hashCode()), traverser); + this.master.send(this.master.workers().get(this.partitioner.getPartitions().indexOf(this.partitioner.getPartition((Element) traverser.get()))), traverser); else this.master.send(this.master.address(), traverser); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/23c48cf2/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java index 58e06d6..4275caa 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java @@ -61,8 +61,7 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> { private final Partition localPartition; private final Partitioner partitioner; // - private final Map<String, Address.Worker> workers = new HashMap<>(); - private final String neighborWorker; + private Address.Worker neighborWorker; private boolean isLeader; private Terminate terminate = null; private boolean voteToHalt = false; @@ -70,10 +69,10 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> { public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) { this.self = self; - System.out.println("worker[created]: " + this.self.address().location()); + // System.out.println("worker[created]: " + this.self.address().location()); // set up partition and traversal information - this.localPartition = self.partition(); this.partitioner = partitioner; + this.localPartition = self.partition(); final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.self); TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal); this.matrix = new TraversalMatrix<>(traversal); @@ -88,19 +87,14 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> { ((GraphStep<Edge, Edge>) traversal.getStartStep()).setIteratorSupplier( () -> IteratorUtils.filter(this.localPartition.edges(graphStep.getIds()), this.localPartition::contains)); } - // create termination ring topology - final int i = this.partitioner.getPartitions().indexOf(this.localPartition); - this.neighborWorker = "../worker-" + this.partitioner.getPartitions().get(i == this.partitioner.getPartitions().size() - 1 ? 0 : i + 1).hashCode(); - this.isLeader = i == 0; - for (final Address.Worker worker : self.workers()) { - //if (!worker.equals(this.self.address())) - this.workers.put(worker.location(), worker); - } } @Override public void setup() { - + // create termination ring topology + final int i = this.self.workers().indexOf(this.self.address()); + this.neighborWorker = this.self.workers().get(i == this.self.workers().size() - 1 ? 0 : i + 1); + this.isLeader = i == 0; } @Override @@ -113,7 +107,7 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> { this.sendTraverser(step.next()); } // internal vote to have in mailbox as final message to process - // assert null == this.terminate; + assert null == this.terminate; if (this.isLeader) { this.terminate = Terminate.MAYBE; this.self.send(this.self.address(), VoteToHaltMessage.instance()); @@ -125,7 +119,7 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> { } else if (message instanceof SideEffectSetMessage) { this.matrix.getTraversal().getSideEffects().set(((SideEffectSetMessage) message).getKey(), ((SideEffectSetMessage) message).getValue()); } else if (message instanceof Terminate) { - // assert this.isLeader || this.terminate != Terminate.MAYBE; + assert this.isLeader || this.terminate != Terminate.MAYBE; this.terminate = (Terminate) message; this.self.send(this.self.address(), VoteToHaltMessage.instance()); } else if (message instanceof VoteToHaltMessage) { @@ -145,9 +139,9 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> { if (this.voteToHalt && Terminate.YES == this.terminate) this.self.send(this.self.master(), VoteToHaltMessage.instance()); else - this.self.send(this.workers.get(this.neighborWorker), Terminate.YES); + this.self.send(this.neighborWorker, Terminate.YES); } else - this.self.send(this.workers.get(this.neighborWorker), this.voteToHalt ? this.terminate : Terminate.NO); + this.self.send(this.neighborWorker, this.voteToHalt ? this.terminate : Terminate.NO); this.terminate = null; this.voteToHalt = true; } @@ -169,7 +163,7 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> { ////////////// private void processTraverser(final Traverser.Admin traverser) { - // assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.localPartition.contains((Element) traverser.get()); + assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.localPartition.contains((Element) traverser.get()); final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId()); if (step instanceof Bypassing) ((Bypassing) step).setBypass(true); GraphComputing.atMaster(step, false); @@ -188,7 +182,7 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> { if (traverser.isHalted()) this.self.send(this.self.master(), traverser); else if (traverser.get() instanceof Element && !this.localPartition.contains((Element) traverser.get())) - this.self.send(this.workers.get("../worker-" + this.partitioner.getPartition((Element) traverser.get()).hashCode()), traverser); + this.self.send(this.self.workers().get(this.partitioner.getPartitions().indexOf(this.partitioner.getPartition((Element) traverser.get()))), traverser); else this.self.send(this.self.address(), traverser); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/23c48cf2/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java index 77be06b..207dd57 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java @@ -32,6 +32,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies; import org.apache.tinkerpop.gremlin.process.traversal.Traverser; import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet; import org.apache.tinkerpop.gremlin.structure.Partitioner; import org.apache.tinkerpop.gremlin.structure.util.StringFactory; @@ -70,7 +71,7 @@ public final class ActorStep<S, E> extends AbstractStep<E, E> { if (this.first) { this.first = false; try { - final Actors<S, E> actors = this.actorsClass.getConstructor(ActorProgram.class, Partitioner.class).newInstance(new TraversalActorProgram(this.partitionTraversal, partitioner), this.partitioner); + final Actors<TraverserSet<E>> actors = this.actorsClass.getConstructor(ActorProgram.class, Partitioner.class).newInstance(new TraversalActorProgram<E>(this.partitionTraversal, partitioner), this.partitioner); actors.submit().get().forEach(this.starts::add); } catch (final Exception e) { throw new IllegalStateException(e.getMessage(), e);