forgot to add TinkerWorkerMemory file.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/056d7aed Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/056d7aed Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/056d7aed Branch: refs/heads/TINKERPOP-1585 Commit: 056d7aedffa83f8d06462617670273857e2bea19 Parents: 0db0991 Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Tue Jan 3 18:00:18 2017 -0700 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Thu Jan 5 16:59:45 2017 -0700 ---------------------------------------------------------------------- .../process/computer/TinkerWorkerMemory.java | 116 +++++++++++++++++++ 1 file changed, 116 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/056d7aed/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java ---------------------------------------------------------------------- diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java new file mode 100644 index 0000000..28b99e3 --- /dev/null +++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.tinkergraph.process.computer; + +import org.apache.tinkerpop.gremlin.process.computer.Memory; +import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey; +import org.apache.tinkerpop.gremlin.process.computer.VertexProgram; +import org.apache.tinkerpop.gremlin.util.Serializer; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.BinaryOperator; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class TinkerWorkerMemory implements Memory.Admin { + + private final TinkerMemory mainMemory; + private final Map<String, Object> workerMemory = new HashMap<>(); + private final Map<String, BinaryOperator<Object>> reducers = new HashMap<>(); + + public TinkerWorkerMemory(final TinkerMemory mainMemory) { + this.mainMemory = mainMemory; + for (final MemoryComputeKey key : this.mainMemory.memoryKeys.values()) { + try { + final MemoryComputeKey clone = (MemoryComputeKey) Serializer.deserializeObject(Serializer.serializeObject(key)); + this.reducers.put(clone.getKey(), clone.getReducer()); + } catch (final IOException | ClassNotFoundException e) { + this.reducers.put(key.getKey(), key.getReducer()); // super ghetto + } + } + } + + @Override + public Set<String> keys() { + return this.mainMemory.keys(); + } + + @Override + public void incrIteration() { + this.mainMemory.incrIteration(); + } + + @Override + public void setIteration(final int iteration) { + this.mainMemory.setIteration(iteration); + } + + @Override + public int getIteration() { + return this.mainMemory.getIteration(); + } + + @Override + public void setRuntime(final long runTime) { + this.mainMemory.setRuntime(runTime); + } + + @Override + public long getRuntime() { + return this.mainMemory.getRuntime(); + } + + @Override + public boolean isInitialIteration() { + return this.mainMemory.isInitialIteration(); + } + + @Override + public <R> R get(final String key) throws IllegalArgumentException { + return this.mainMemory.get(key); + } + + @Override + public void set(final String key, final Object value) { + this.mainMemory.set(key, value); + } + + @Override + public void add(final String key, final Object value) { + this.mainMemory.checkKeyValue(key, value); + final Object v = this.workerMemory.get(key); + this.workerMemory.put(key, null == v ? value : this.reducers.get(key).apply(v, value)); + } + + @Override + public String toString() { + return this.mainMemory.toString(); + } + + protected void complete() { + for (final Map.Entry<String, Object> entry : this.workerMemory.entrySet()) { + this.mainMemory.add(entry.getKey(), entry.getValue()); + } + } +}