Added Serializer.cloneObject() which clones via serialization (helper method). TinkerGraphComputer now how a sound distributed Memory system where each worker/thread aggregates without concurrency locally and then, at the end of the iteration, the thread-distributed memories are aggregated into the main TinkerMemory.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/1ac003d0 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/1ac003d0 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/1ac003d0 Branch: refs/heads/tp32 Commit: 1ac003d00807c1351594d04a4bbdb55e93b00134 Parents: 056d7ae Author: Marko A. Rodriguez <[email protected]> Authored: Wed Jan 4 04:06:30 2017 -0700 Committer: Marko A. Rodriguez <[email protected]> Committed: Thu Jan 5 16:59:45 2017 -0700 ---------------------------------------------------------------------- .../apache/tinkerpop/gremlin/util/Serializer.java | 4 ++++ .../process/computer/TinkerGraphComputer.java | 8 +++----- .../process/computer/TinkerWorkerMemory.java | 6 +++--- .../process/computer/TinkerWorkerPool.java | 15 ++++++++++++--- 4 files changed, 22 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1ac003d0/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/Serializer.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/Serializer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/Serializer.java index 28bab16..988fce3 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/Serializer.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/Serializer.java @@ -47,4 +47,8 @@ public final class Serializer { in.close(); return object; } + + public static <V> V cloneObject(final V object) throws IOException, ClassNotFoundException { + return (V) Serializer.deserializeObject(Serializer.serializeObject(object)); + } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1ac003d0/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java index fef2e1a..7523d63 100644 --- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java +++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java @@ -157,7 +157,7 @@ public final class TinkerGraphComputer implements GraphComputer { return computerService.submit(() -> { final long time = System.currentTimeMillis(); final TinkerGraphComputerView view; - final TinkerWorkerPool workers = new TinkerWorkerPool(this.workers); + final TinkerWorkerPool workers = new TinkerWorkerPool(this.memory, this.workers); try { if (null != this.vertexProgram) { view = TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, this.vertexProgram.getVertexComputeKeys()); @@ -168,8 +168,7 @@ public final class TinkerGraphComputer implements GraphComputer { this.memory.completeSubRound(); workers.setVertexProgram(this.vertexProgram); final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices()); - workers.executeVertexProgram(vertexProgram -> { - final TinkerWorkerMemory workerMemory = new TinkerWorkerMemory(this.memory); + workers.executeVertexProgram((vertexProgram, workerMemory) -> { vertexProgram.workerIterationStart(workerMemory.asImmutable()); while (true) { final Vertex vertex = vertices.next(); @@ -178,8 +177,7 @@ public final class TinkerGraphComputer implements GraphComputer { vertexProgram.execute( ComputerGraph.vertexProgram(vertex, vertexProgram), new TinkerMessenger<>(vertex, this.messageBoard, vertexProgram.getMessageCombiner()), - workerMemory - ); + workerMemory); } vertexProgram.workerIterationEnd(workerMemory.asImmutable()); workerMemory.complete(); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1ac003d0/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java index 28b99e3..081e4fa 100644 --- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java +++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java @@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.computer; import org.apache.tinkerpop.gremlin.process.computer.Memory; import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey; -import org.apache.tinkerpop.gremlin.process.computer.VertexProgram; import org.apache.tinkerpop.gremlin.util.Serializer; import java.io.IOException; @@ -43,10 +42,10 @@ public final class TinkerWorkerMemory implements Memory.Admin { this.mainMemory = mainMemory; for (final MemoryComputeKey key : this.mainMemory.memoryKeys.values()) { try { - final MemoryComputeKey clone = (MemoryComputeKey) Serializer.deserializeObject(Serializer.serializeObject(key)); + final MemoryComputeKey clone = Serializer.cloneObject(key); this.reducers.put(clone.getKey(), clone.getReducer()); } catch (final IOException | ClassNotFoundException e) { - this.reducers.put(key.getKey(), key.getReducer()); // super ghetto + throw new IllegalStateException(e.getMessage(), e); } } } @@ -112,5 +111,6 @@ public final class TinkerWorkerMemory implements Memory.Admin { for (final Map.Entry<String, Object> entry : this.workerMemory.entrySet()) { this.mainMemory.add(entry.getKey(), entry.getValue()); } + this.workerMemory.clear(); } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1ac003d0/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java index 3d851bf..140d347 100644 --- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java +++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java @@ -24,10 +24,13 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexProgram; import org.apache.tinkerpop.gremlin.process.computer.util.MapReducePool; import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool; +import java.util.Queue; import java.util.concurrent.CompletionService; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.BiConsumer; import java.util.function.Consumer; /** @@ -44,11 +47,15 @@ public final class TinkerWorkerPool implements AutoCloseable { private VertexProgramPool vertexProgramPool; private MapReducePool mapReducePool; + private final Queue<TinkerWorkerMemory> workerMemoryPool = new ConcurrentLinkedQueue<>(); - public TinkerWorkerPool(final int numberOfWorkers) { + public TinkerWorkerPool(final TinkerMemory memory, final int numberOfWorkers) { this.numberOfWorkers = numberOfWorkers; this.workerPool = Executors.newFixedThreadPool(numberOfWorkers, THREAD_FACTORY_WORKER); this.completionService = new ExecutorCompletionService<>(this.workerPool); + for (int i = 0; i < this.numberOfWorkers; i++) { + this.workerMemoryPool.add(new TinkerWorkerMemory(memory)); + } } public void setVertexProgram(final VertexProgram vertexProgram) { @@ -59,12 +66,14 @@ public final class TinkerWorkerPool implements AutoCloseable { this.mapReducePool = new MapReducePool(mapReduce, this.numberOfWorkers); } - public void executeVertexProgram(final Consumer<VertexProgram> worker) throws InterruptedException { + public void executeVertexProgram(final BiConsumer<VertexProgram, TinkerWorkerMemory> worker) throws InterruptedException { for (int i = 0; i < this.numberOfWorkers; i++) { this.completionService.submit(() -> { final VertexProgram vp = this.vertexProgramPool.take(); - worker.accept(vp); + final TinkerWorkerMemory workerMemory = this.workerMemoryPool.poll(); + worker.accept(vp, workerMemory); this.vertexProgramPool.offer(vp); + this.workerMemoryPool.offer(workerMemory); return null; }); }
