This is an automated email from the ASF dual-hosted git repository. jin pushed a commit to branch pd-store in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit 94edfcd72014851936f55704bed172541805e5be Author: Wu Chencan <[email protected]> AuthorDate: Tue Oct 24 06:33:24 2023 -0500 feat(core): support batch+parallel edges traverse (#2312) - Enhance Consumers.java, supporting ExceptionHandle and `Future` to handle InterruptedException when awaiting - Add Nested Iterator Edge and support batch execution - Support batch execution & thread parallel in KoutTraverser and Kneighbor --- .../backend/query/EdgesQueryIterator.java | 64 ++++++ .../org/apache/hugegraph/task/TaskManager.java | 15 +- .../traversal/algorithm/HugeTraverser.java | 45 +++++ .../traversal/algorithm/KneighborTraverser.java | 56 +++--- .../traversal/algorithm/KoutTraverser.java | 76 +++---- .../traversal/algorithm/OltpTraverser.java | 223 ++++++++++++++++++++- .../algorithm/records/KneighborRecords.java | 14 +- .../hugegraph/traversal/algorithm/steps/Steps.java | 4 + .../java/org/apache/hugegraph/util/Consumers.java | 128 ++++++++---- .../backend/store/rocksdb/RocksDBStore.java | 13 +- 10 files changed, 516 insertions(+), 122 deletions(-) diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/query/EdgesQueryIterator.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/query/EdgesQueryIterator.java new file mode 100644 index 000000000..4ab9a8859 --- /dev/null +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/query/EdgesQueryIterator.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.backend.query; + +import java.util.Iterator; +import java.util.List; + +import org.apache.hugegraph.backend.id.Id; +import org.apache.hugegraph.backend.tx.GraphTransaction; +import org.apache.hugegraph.type.define.Directions; + +public class EdgesQueryIterator implements Iterator<Query> { + + private final List<Id> labels; + private final Directions directions; + private final long limit; + private final Iterator<Id> sources; + + public EdgesQueryIterator(Iterator<Id> sources, + Directions directions, + List<Id> labels, + long limit) { + this.sources = sources; + this.labels = labels; + this.directions = directions; + // Traverse NO_LIMIT ε Query.NO_LIMIT δΈε + this.limit = limit < 0 ? Query.NO_LIMIT : limit; + } + + @Override + public boolean hasNext() { + return sources.hasNext(); + } + + @Override + public Query next() { + Id sourceId = this.sources.next(); + ConditionQuery query = GraphTransaction.constructEdgesQuery(sourceId, + this.directions, + this.labels); + if (this.limit != Query.NO_LIMIT) { + query.limit(this.limit); + query.capacity(this.limit); + } else { + query.capacity(Query.NO_CAPACITY); + } + return query; + } +} diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java index 056b7ac5a..177af64ba 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java @@ -26,16 +26,17 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.hugegraph.HugeException; +import org.apache.hugegraph.HugeGraphParams; +import org.apache.hugegraph.concurrent.PausableScheduledThreadPool; import org.apache.hugegraph.type.define.NodeRole; -import org.apache.hugegraph.util.*; import org.apache.hugegraph.util.Consumers; +import org.apache.hugegraph.util.E; +import org.apache.hugegraph.util.ExecutorUtil; import org.apache.hugegraph.util.LockUtil; +import org.apache.hugegraph.util.Log; import org.slf4j.Logger; -import org.apache.hugegraph.HugeException; -import org.apache.hugegraph.HugeGraphParams; -import org.apache.hugegraph.concurrent.PausableScheduledThreadPool; - public final class TaskManager { private static final Logger LOG = Log.logger(TaskManager.class); @@ -53,7 +54,7 @@ public final class TaskManager { public static final String DISTRIBUTED_TASK_SCHEDULER = "distributed-scheduler-%d"; protected static final long SCHEDULE_PERIOD = 1000L; // unit ms - + private static final long TX_CLOSE_TIMEOUT = 30L; // unit s private static final int THREADS = 4; private static final TaskManager MANAGER = new TaskManager(THREADS); @@ -184,7 +185,7 @@ public final class TaskManager { graph.closeTx(); } else { Consumers.executeOncePerThread(this.taskExecutor, totalThreads, - graph::closeTx); + graph::closeTx, TX_CLOSE_TIMEOUT); } } catch (Exception e) { throw new HugeException("Exception when closing task tx", e); diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java index f5415d9c5..194576e85 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java @@ -17,6 +17,8 @@ package org.apache.hugegraph.traversal.algorithm; +import java.io.Closeable; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -37,6 +39,7 @@ import org.apache.hugegraph.HugeGraph; import org.apache.hugegraph.backend.id.Id; import org.apache.hugegraph.backend.query.Aggregate; import org.apache.hugegraph.backend.query.ConditionQuery; +import org.apache.hugegraph.backend.query.EdgesQueryIterator; import org.apache.hugegraph.backend.query.Query; import org.apache.hugegraph.backend.query.QueryResults; import org.apache.hugegraph.backend.tx.GraphTransaction; @@ -66,6 +69,7 @@ import org.apache.hugegraph.util.collection.CollectionFactory; import org.apache.hugegraph.util.collection.ObjectIntMapping; import org.apache.hugegraph.util.collection.ObjectIntMappingFactory; import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator; import org.slf4j.Logger; import com.google.common.collect.ImmutableList; @@ -465,6 +469,13 @@ public class HugeTraverser { return edgeStep.skipSuperNodeIfNeeded(edges); } + public EdgesIterator edgesOfVertices(Iterator<Id> sources, + Directions dir, + List<Id> labelIds, + long degree) { + return new EdgesIterator(new EdgesQueryIterator(sources, dir, labelIds, degree)); + } + public Iterator<Edge> edgesOfVertex(Id source, Steps steps) { List<Id> edgeLabels = steps.edgeLabels(); ConditionQuery cq = GraphTransaction.constructEdgesQuery( @@ -474,6 +485,11 @@ public class HugeTraverser { cq.limit(steps.limit()); } + if (steps.isEdgeEmpty()) { + Iterator<Edge> edges = this.graph().edges(cq); + return edgesOfVertexStep(edges, steps); + } + Map<Id, ConditionQuery> edgeConditions = getFilterQueryConditions(steps.edgeSteps(), HugeType.EDGE); @@ -1004,4 +1020,33 @@ public class HugeTraverser { return edges; } } + + public class EdgesIterator implements Iterator<Iterator<Edge>>, Closeable { + + private final Iterator<Iterator<Edge>> currentIter; + + public EdgesIterator(EdgesQueryIterator queries) { + List<Iterator<Edge>> iteratorList = new ArrayList<>(); + while (queries.hasNext()) { + Iterator<Edge> edges = graph.edges(queries.next()); + iteratorList.add(edges); + } + this.currentIter = iteratorList.iterator(); + } + + @Override + public boolean hasNext() { + return this.currentIter.hasNext(); + } + + @Override + public Iterator<Edge> next() { + return this.currentIter.next(); + } + + @Override + public void close() throws IOException { + CloseableIterator.closeIterator(currentIter); + } + } } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java index 9f16f480b..565d0af5f 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java @@ -17,11 +17,11 @@ package org.apache.hugegraph.traversal.algorithm; -import java.util.Iterator; import java.util.Set; import java.util.function.Consumer; import org.apache.hugegraph.HugeGraph; +import org.apache.hugegraph.backend.id.EdgeId; import org.apache.hugegraph.backend.id.Id; import org.apache.hugegraph.structure.HugeEdge; import org.apache.hugegraph.traversal.algorithm.records.KneighborRecords; @@ -48,25 +48,27 @@ public class KneighborTraverser extends OltpTraverser { Id labelId = this.getEdgeLabelId(label); - Set<Id> latest = newSet(); - Set<Id> all = newSet(); + KneighborRecords records = new KneighborRecords(true, sourceV, true); - latest.add(sourceV); - this.vertexIterCounter.addAndGet(1L); + Consumer<EdgeId> consumer = edgeId -> { + if (this.reachLimit(limit, records.size())) { + return; + } + records.addPath(edgeId.ownerVertexId(), edgeId.otherVertexId()); + }; while (depth-- > 0) { - long remaining = limit == NO_LIMIT ? NO_LIMIT : limit - all.size(); - latest = this.adjacentVertices(sourceV, latest, dir, labelId, - all, degree, remaining); - all.addAll(latest); - this.vertexIterCounter.addAndGet(1L); - this.edgeIterCounter.addAndGet(latest.size()); - if (reachLimit(limit, all.size())) { + records.startOneLayer(true); + traverseIdsByBfs(records.keys(), dir, labelId, degree, NO_LIMIT, consumer); + records.finishOneLayer(); + if (reachLimit(limit, records.size())) { break; } } - return all; + this.vertexIterCounter.addAndGet(records.size()); + + return records.idsBySet(limit); } public KneighborRecords customizedKneighbor(Id source, Steps steps, @@ -76,33 +78,29 @@ public class KneighborTraverser extends OltpTraverser { checkPositive(maxDepth, "k-neighbor max_depth"); checkLimit(limit); - boolean concurrent = maxDepth >= this.concurrentDepth(); - - KneighborRecords records = new KneighborRecords(concurrent, + KneighborRecords records = new KneighborRecords(true, source, true); - Consumer<Id> consumer = v -> { + Consumer<Edge> consumer = edge -> { if (this.reachLimit(limit, records.size())) { return; } - Iterator<Edge> edges = edgesOfVertex(v, steps); - this.vertexIterCounter.addAndGet(1L); - while (!this.reachLimit(limit, records.size()) && edges.hasNext()) { - HugeEdge edge = (HugeEdge) edges.next(); - Id target = edge.id().otherVertexId(); - records.addPath(v, target); - - records.edgeResults().addEdge(v, target, edge); - - this.edgeIterCounter.addAndGet(1L); - } + EdgeId edgeId = ((HugeEdge) edge).id(); + records.addPath(edgeId.ownerVertexId(), edgeId.otherVertexId()); + records.edgeResults().addEdge(edgeId.ownerVertexId(), edgeId.otherVertexId(), edge); }; while (maxDepth-- > 0) { records.startOneLayer(true); - traverseIds(records.keys(), consumer, concurrent); + traverseIdsByBfs(records.keys(), steps, NO_LIMIT, consumer); records.finishOneLayer(); + if (this.reachLimit(limit, records.size())) { + break; + } } + + this.vertexIterCounter.addAndGet(records.size()); + return records; } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java index 9924c766c..c683694c1 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java @@ -18,12 +18,15 @@ package org.apache.hugegraph.traversal.algorithm; import java.util.Iterator; +import java.util.List; import java.util.Set; import java.util.function.Consumer; import org.apache.hugegraph.HugeException; import org.apache.hugegraph.HugeGraph; +import org.apache.hugegraph.backend.id.EdgeId; import org.apache.hugegraph.backend.id.Id; +import org.apache.hugegraph.backend.query.Query; import org.apache.hugegraph.structure.HugeEdge; import org.apache.hugegraph.traversal.algorithm.records.KoutRecords; import org.apache.hugegraph.traversal.algorithm.steps.Steps; @@ -57,34 +60,45 @@ public class KoutTraverser extends OltpTraverser { Id labelId = this.getEdgeLabelId(label); - Set<Id> latest = newIdSet(); - latest.add(sourceV); + Set<Id> sources = newIdSet(); + Set<Id> neighbors = newIdSet(); + Set<Id> visited = nearest ? newIdSet() : null; - Set<Id> all = newIdSet(); - all.add(sourceV); + neighbors.add(sourceV); + + ConcurrentVerticesConsumer consumer; + + long remaining = capacity == NO_LIMIT ? NO_LIMIT : capacity - 1; - long remaining = capacity == NO_LIMIT ? - NO_LIMIT : capacity - latest.size(); - this.vertexIterCounter.addAndGet(1L); while (depth-- > 0) { // Just get limit nodes in last layer if limit < remaining capacity if (depth == 0 && limit != NO_LIMIT && (limit < remaining || remaining == NO_LIMIT)) { remaining = limit; } - if (nearest) { - latest = this.adjacentVertices(sourceV, latest, dir, labelId, - all, degree, remaining); - all.addAll(latest); - } else { - latest = this.adjacentVertices(sourceV, latest, dir, labelId, - null, degree, remaining); + + if (visited != null) { + visited.addAll(neighbors); } - this.vertexIterCounter.addAndGet(1L); - this.edgeIterCounter.addAndGet(latest.size()); + + // swap sources and neighbors + Set<Id> tmp = neighbors; + neighbors = sources; + sources = tmp; + + // start + consumer = new ConcurrentVerticesConsumer(sourceV, visited, remaining, neighbors); + + this.vertexIterCounter.addAndGet(sources.size()); + this.edgeIterCounter.addAndGet(neighbors.size()); + + traverseIdsByBfs(sources.iterator(), dir, labelId, degree, capacity, consumer); + + sources.clear(); + if (capacity != NO_LIMIT) { // Update 'remaining' value to record remaining capacity - remaining -= latest.size(); + remaining -= neighbors.size(); if (remaining <= 0 && depth > 0) { throw new HugeException( @@ -94,7 +108,7 @@ public class KoutTraverser extends OltpTraverser { } } - return latest; + return neighbors; } public KoutRecords customizedKout(Id source, Steps steps, @@ -107,33 +121,25 @@ public class KoutTraverser extends OltpTraverser { checkLimit(limit); long[] depth = new long[1]; depth[0] = maxDepth; - boolean concurrent = maxDepth >= this.concurrentDepth(); - KoutRecords records = new KoutRecords(concurrent, source, nearest, 0); + KoutRecords records = new KoutRecords(true, source, nearest, 0); - Consumer<Id> consumer = v -> { + Consumer<Edge> consumer = edge -> { if (this.reachLimit(limit, depth[0], records.size())) { return; } - Iterator<Edge> edges = edgesOfVertex(v, steps); - this.vertexIterCounter.addAndGet(1L); - while (!this.reachLimit(limit, depth[0], records.size()) && - edges.hasNext()) { - HugeEdge edge = (HugeEdge) edges.next(); - Id target = edge.id().otherVertexId(); - records.addPath(v, target); - this.checkCapacity(capacity, records.accessed(), depth[0]); - - records.edgeResults().addEdge(v, target, edge); - - this.edgeIterCounter.addAndGet(1L); - } + EdgeId edgeId = ((HugeEdge) edge).id(); + records.addPath(edgeId.ownerVertexId(), edgeId.otherVertexId()); + records.edgeResults().addEdge(edgeId.ownerVertexId(), edgeId.otherVertexId(), edge); }; while (depth[0]-- > 0) { + List<Id> sources = records.ids(Query.NO_LIMIT); records.startOneLayer(true); - this.traverseIds(records.keys(), consumer, concurrent); + traverseIdsByBfs(sources.iterator(), steps, capacity, consumer); + this.vertexIterCounter.addAndGet(sources.size()); records.finishOneLayer(); + checkCapacity(capacity, records.accessed(), depth[0]); } return records; } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java index b05de2422..c05d8f89f 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java @@ -17,24 +17,36 @@ package org.apache.hugegraph.traversal.algorithm; +import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import com.google.common.base.Objects; import org.apache.commons.lang3.tuple.Pair; import org.apache.hugegraph.HugeGraph; +import org.apache.hugegraph.backend.id.EdgeId; import org.apache.hugegraph.backend.id.Id; +import org.apache.hugegraph.backend.query.EdgesQueryIterator; import org.apache.hugegraph.config.CoreOptions; +import org.apache.hugegraph.iterator.FilterIterator; +import org.apache.hugegraph.iterator.MapperIterator; +import org.apache.hugegraph.structure.HugeEdge; +import org.apache.hugegraph.traversal.algorithm.steps.Steps; +import org.apache.hugegraph.type.define.Directions; import org.apache.hugegraph.util.Consumers; +import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Element; import org.apache.tinkerpop.gremlin.structure.Property; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator; -import org.apache.hugegraph.iterator.FilterIterator; +import com.google.common.base.Objects; public abstract class OltpTraverser extends HugeTraverser implements AutoCloseable { @@ -75,7 +87,7 @@ public abstract class OltpTraverser extends HugeTraverser protected long traversePairs(Iterator<Pair<Id, Id>> pairs, Consumer<Pair<Id, Id>> consumer) { - return this.traverse(pairs, consumer, "traverse-pairs"); + return this.traverseByOne(pairs, consumer, "traverse-pairs"); } protected long traverseIds(Iterator<Id> ids, Consumer<Id> consumer, @@ -93,18 +105,19 @@ public abstract class OltpTraverser extends HugeTraverser } protected long traverseIds(Iterator<Id> ids, Consumer<Id> consumer) { - return this.traverse(ids, consumer, "traverse-ids"); + return this.traverseByOne(ids, consumer, "traverse-ids"); } - protected <K> long traverse(Iterator<K> iterator, Consumer<K> consumer, - String name) { + protected <K> long traverseByOne(Iterator<K> iterator, + Consumer<K> consumer, + String taskName) { if (!iterator.hasNext()) { return 0L; } Consumers<K> consumers = new Consumers<>(executors.getExecutor(), consumer, null); - consumers.start(name); + consumers.start(taskName); long total = 0L; try { while (iterator.hasNext()) { @@ -129,11 +142,101 @@ public abstract class OltpTraverser extends HugeTraverser return total; } + protected void traverseIdsByBfs(Iterator<Id> vertices, + Directions dir, + Id label, + long degree, + long capacity, + Consumer<EdgeId> consumer) { + List<Id> labels = label == null ? Collections.emptyList() : + Collections.singletonList(label); + OneStepEdgeIterConsumer edgeIterConsumer = new OneStepEdgeIterConsumer(consumer, capacity); + + EdgesIterator edgeIter = edgesOfVertices(vertices, dir, labels, degree); + + // parallel out-of-order execution + this.traverseByBatch(edgeIter, edgeIterConsumer, "traverse-bfs-step", 1); + } + + protected void traverseIdsByBfs(Iterator<Id> vertices, + Steps steps, + long capacity, + Consumer<Edge> consumer) { + StepsEdgeIterConsumer edgeIterConsumer = + new StepsEdgeIterConsumer(consumer, capacity, steps); + + EdgesQueryIterator queryIterator = new EdgesQueryIterator(vertices, + steps.direction(), + steps.edgeLabels(), + steps.degree()); + + // get Iterator<Iterator<edges>> from Iterator<Query> + EdgesIterator edgeIter = new EdgesIterator(queryIterator); + + // parallel out-of-order execution + this.traverseByBatch(edgeIter, edgeIterConsumer, "traverse-bfs-steps", 1); + } + + protected <K> long traverseByBatch(Iterator<Iterator<K>> sources, + Consumer<Iterator<K>> consumer, + String taskName, int concurrentWorkers) { + if (!sources.hasNext()) { + return 0L; + } + AtomicBoolean done = new AtomicBoolean(false); + Consumers<Iterator<K>> consumers = null; + try { + consumers = buildConsumers(consumer, concurrentWorkers, done, + executors.getExecutor()); + return startConsumers(sources, taskName, done, consumers); + } finally { + assert consumers != null; + executors.returnExecutor(consumers.executor()); + } + } + + private <K> long startConsumers(Iterator<Iterator<K>> sources, + String taskName, + AtomicBoolean done, + Consumers<Iterator<K>> consumers) { + long total = 0L; + try { + consumers.start(taskName); + while (sources.hasNext() && !done.get()) { + total++; + Iterator<K> v = sources.next(); + consumers.provide(v); + } + } catch (Consumers.StopExecution e) { + // pass + } catch (Throwable e) { + throw Consumers.wrapException(e); + } finally { + try { + consumers.await(); + } catch (Throwable e) { + throw Consumers.wrapException(e); + } finally { + CloseableIterator.closeIterator(sources); + } + } + return total; + } + + private <K> Consumers<Iterator<K>> buildConsumers(Consumer<Iterator<K>> consumer, + int queueSizePerWorker, + AtomicBoolean done, + ExecutorService executor) { + return new Consumers<>(executor, + consumer, + null, + e -> done.set(true), + queueSizePerWorker); + } + protected Iterator<Vertex> filter(Iterator<Vertex> vertices, String key, Object value) { - return new FilterIterator<>(vertices, vertex -> { - return match(vertex, key, value); - }); + return new FilterIterator<>(vertices, vertex -> match(vertex, key, value)); } protected boolean match(Element elem, String key, Object value) { @@ -175,4 +278,104 @@ public abstract class OltpTraverser extends HugeTraverser return values; } } + + public static class ConcurrentVerticesConsumer implements Consumer<EdgeId> { + + private final Id sourceV; + private final Set<Id> excluded; + private final Set<Id> neighbors; + private final long limit; + private final AtomicInteger count; + + public ConcurrentVerticesConsumer(Id sourceV, Set<Id> excluded, long limit, + Set<Id> neighbors) { + this.sourceV = sourceV; + this.excluded = excluded; + this.limit = limit; + this.neighbors = neighbors; + this.count = new AtomicInteger(0); + } + + @Override + public void accept(EdgeId edgeId) { + if (this.limit != NO_LIMIT && count.get() >= this.limit) { + throw new Consumers.StopExecution("reach limit"); + } + + Id targetV = edgeId.otherVertexId(); + if (this.sourceV.equals(targetV)) { + return; + } + + if (this.excluded != null && this.excluded.contains(targetV)) { + return; + } + + if (this.neighbors.add(targetV)) { + if (this.limit != NO_LIMIT) { + this.count.getAndIncrement(); + } + } + } + } + + public abstract class EdgesConsumer<T, E> implements Consumer<Iterator<T>> { + + private final Consumer<E> consumer; + private final long capacity; + + public EdgesConsumer(Consumer<E> consumer, long capacity) { + this.consumer = consumer; + this.capacity = capacity; + } + + protected abstract Iterator<E> prepare(Iterator<T> iter); + + @Override + public void accept(Iterator<T> edgeIter) { + Iterator<E> ids = prepare(edgeIter); + long counter = 0; + while (ids.hasNext()) { + if (Thread.currentThread().isInterrupted()) { + LOG.warn("Consumer is Interrupted"); + break; + } + counter++; + this.consumer.accept(ids.next()); + } + long total = edgeIterCounter.addAndGet(counter); + // traverse by batch & improve performance + if (this.capacity != NO_LIMIT && total >= this.capacity) { + throw new Consumers.StopExecution("reach capacity"); + } + } + } + + public class OneStepEdgeIterConsumer extends EdgesConsumer<Edge, EdgeId> { + + public OneStepEdgeIterConsumer(Consumer<EdgeId> consumer, long capacity) { + super(consumer, capacity); + } + + @Override + protected Iterator<EdgeId> prepare(Iterator<Edge> edgeIter) { + return new MapperIterator<>(edgeIter, (e) -> ((HugeEdge) e).id()); + } + } + + public class StepsEdgeIterConsumer extends EdgesConsumer<Edge, Edge> { + + private final Steps steps; + + public StepsEdgeIterConsumer(Consumer<Edge> consumer, long capacity, + Steps steps) { + super(consumer, capacity); + this.steps = steps; + } + + @Override + protected Iterator<Edge> prepare(Iterator<Edge> edgeIter) { + return edgesOfVertexStep(edgeIter, this.steps); + } + } } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java index 7e04a286c..649b1c211 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java @@ -19,7 +19,9 @@ package org.apache.hugegraph.traversal.algorithm.records; import static org.apache.hugegraph.traversal.algorithm.HugeTraverser.NO_LIMIT; +import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.Stack; import org.apache.hugegraph.backend.id.Id; @@ -45,6 +47,17 @@ public class KneighborRecords extends SingleWayMultiPathsRecords { @Override public List<Id> ids(long limit) { List<Id> ids = CollectionFactory.newList(CollectionType.EC); + this.getRecords(limit, ids); + return ids; + } + + public Set<Id> idsBySet(long limit) { + Set<Id> ids = CollectionFactory.newSet(CollectionType.EC); + this.getRecords(limit, ids); + return ids; + } + + private void getRecords(long limit, Collection<Id> ids) { Stack<Record> records = this.records(); // Not include record(i=0) to ignore source vertex for (int i = 1; i < records.size(); i++) { @@ -54,7 +67,6 @@ public class KneighborRecords extends SingleWayMultiPathsRecords { limit--; } } - return ids; } @Override diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/steps/Steps.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/steps/Steps.java index d1a9238be..c2a1a7e1e 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/steps/Steps.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/steps/Steps.java @@ -138,6 +138,10 @@ public class Steps { return new ArrayList<>(this.edgeSteps.keySet()); } + public boolean isEdgeEmpty() { + return this.edgeSteps.isEmpty(); + } + public boolean isVertexEmpty() { return this.vertexSteps.isEmpty(); } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java index 00689e0c5..06e678fd9 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java @@ -27,16 +27,16 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; -import org.apache.hugegraph.config.CoreOptions; -import org.slf4j.Logger; - import org.apache.hugegraph.HugeException; +import org.apache.hugegraph.config.CoreOptions; import org.apache.hugegraph.task.TaskManager.ContextCallable; +import org.slf4j.Logger; public final class Consumers<V> { @@ -46,16 +46,16 @@ public final class Consumers<V> { private static final Logger LOG = Log.logger(Consumers.class); + private final V QUEUE_END = (V) new Object(); private final ExecutorService executor; private final Consumer<V> consumer; - private final Runnable done; - + private final Runnable doneHandle; + private final Consumer<Throwable> exceptionHandle; private final int workers; + private final List<Future> runningFutures; private final int queueSize; private final CountDownLatch latch; private final BlockingQueue<V> queue; - - private volatile boolean ending = false; private volatile Throwable exception = null; public Consumers(ExecutorService executor, Consumer<V> consumer) { @@ -63,23 +63,40 @@ public final class Consumers<V> { } public Consumers(ExecutorService executor, - Consumer<V> consumer, Runnable done) { + Consumer<V> consumer, Runnable doneHandle) { + this(executor, consumer, doneHandle, QUEUE_WORKER_SIZE); + } + + public Consumers(ExecutorService executor, + Consumer<V> consumer, + Runnable doneHandle, + int queueSizePerWorker) { + this(executor, consumer, doneHandle, null, queueSizePerWorker); + } + + public Consumers(ExecutorService executor, + Consumer<V> consumer, + Runnable doneHandle, + Consumer<Throwable> exceptionHandle, + int queueSizePerWorker) { this.executor = executor; this.consumer = consumer; - this.done = done; + this.doneHandle = doneHandle; + this.exceptionHandle = exceptionHandle; int workers = THREADS; if (this.executor instanceof ThreadPoolExecutor) { workers = ((ThreadPoolExecutor) this.executor).getCorePoolSize(); } this.workers = workers; - this.queueSize = QUEUE_WORKER_SIZE * workers; + + this.runningFutures = new ArrayList<>(workers); + this.queueSize = queueSizePerWorker * workers + 1; this.latch = new CountDownLatch(workers); this.queue = new ArrayBlockingQueue<>(this.queueSize); } public void start(String name) { - this.ending = false; this.exception = null; if (this.executor == null) { return; @@ -87,7 +104,8 @@ public final class Consumers<V> { LOG.info("Starting {} workers[{}] with queue size {}...", this.workers, name, this.queueSize); for (int i = 0; i < this.workers; i++) { - this.executor.submit(new ContextCallable<>(this::runAndDone)); + this.runningFutures.add( + this.executor.submit(new ContextCallable<>(this::runAndDone))); } } @@ -95,11 +113,15 @@ public final class Consumers<V> { try { this.run(); } catch (Throwable e) { - // Only the first exception of one thread can be stored - this.exception = e; - if (!(e instanceof StopExecution)) { + if (e instanceof StopExecution) { + this.queue.clear(); + putQueueEnd(); + } else { + // Only the first exception to one thread can be stored + this.exception = e; LOG.error("Error when running task", e); } + exceptionHandle(e); } finally { this.done(); this.latch.countDown(); @@ -109,11 +131,7 @@ public final class Consumers<V> { private void run() { LOG.debug("Start to work..."); - while (!this.ending) { - this.consume(); - } - assert this.ending; - while (this.consume()){ + while (this.consume()) { // ignore } @@ -121,14 +139,18 @@ public final class Consumers<V> { } private boolean consume() { - V elem; - try { - elem = this.queue.poll(CONSUMER_WAKE_PERIOD, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // ignore - return true; + V elem = null; + while (elem == null) { + try { + elem = this.queue.poll(CONSUMER_WAKE_PERIOD, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // ignore + return false; + } } - if (elem == null) { + + if (elem == QUEUE_END) { + putQueueEnd(); return false; } // do job @@ -136,13 +158,29 @@ public final class Consumers<V> { return true; } + private void exceptionHandle(Throwable e) { + if (this.exceptionHandle == null) { + return; + } + + try { + this.exceptionHandle.accept(e); + } catch (Throwable ex) { + if (this.exception == null) { + this.exception = ex; + } else { + LOG.warn("Error while calling exceptionHandle()", ex); + } + } + } + private void done() { - if (this.done == null) { + if (this.doneHandle == null) { return; } try { - this.done.run(); + this.doneHandle.run(); } catch (Throwable e) { if (this.exception == null) { this.exception = e; @@ -169,6 +207,16 @@ public final class Consumers<V> { } else { try { this.queue.put(v); + } catch (InterruptedException e) { + LOG.warn("Interrupt while queuing QUEUE_END", e); + } + } + } + + private void putQueueEnd() { + if (this.executor != null) { + try { + this.queue.put(QUEUE_END); } catch (InterruptedException e) { LOG.warn("Interrupted while enqueue", e); } @@ -176,15 +224,18 @@ public final class Consumers<V> { } public void await() throws Throwable { - this.ending = true; if (this.executor == null) { // call done() directly if without thread pool this.done(); } else { try { + putQueueEnd(); this.latch.await(); } catch (InterruptedException e) { String error = "Interrupted while waiting for consumers"; + for (Future f : this.runningFutures) { + f.cancel(true); + } this.exception = new HugeException(error, e); LOG.warn(error, e); } @@ -201,7 +252,8 @@ public final class Consumers<V> { public static void executeOncePerThread(ExecutorService executor, int totalThreads, - Runnable callback) + Runnable callback, + long invokeTimeout) throws InterruptedException { // Ensure callback execute at least once for every thread final Map<Thread, Integer> threadsTimes = new ConcurrentHashMap<>(); @@ -230,7 +282,7 @@ public final class Consumers<V> { for (int i = 0; i < totalThreads; i++) { tasks.add(task); } - executor.invokeAll(tasks); + executor.invokeAll(tasks, invokeTimeout, TimeUnit.SECONDS); } public static ExecutorService newThreadPool(String prefix, int workers) { @@ -290,13 +342,21 @@ public final class Consumers<V> { public synchronized void returnExecutor(ExecutorService executor) { E.checkNotNull(executor, "executor"); if (!this.executors.offer(executor)) { - executor.shutdown(); + try { + executor.shutdown(); + } catch (Exception e) { + LOG.warn("close ExecutorService with error:", e); + } } } public synchronized void destroy() { for (ExecutorService executor : this.executors) { - executor.shutdown(); + try { + executor.shutdownNow(); + } catch (Exception e) { + LOG.warn("close ExecutorService with error:", e); + } } this.executors.clear(); } diff --git a/hugegraph-server/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-server/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java index 1d0cdba7b..ca1058b9a 100644 --- a/hugegraph-server/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java +++ b/hugegraph-server/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java @@ -44,9 +44,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; -import org.rocksdb.RocksDBException; -import org.slf4j.Logger; - import org.apache.hugegraph.HugeException; import org.apache.hugegraph.backend.BackendException; import org.apache.hugegraph.backend.id.Id; @@ -69,6 +66,9 @@ import org.apache.hugegraph.util.E; import org.apache.hugegraph.util.ExecutorUtil; import org.apache.hugegraph.util.InsertionOrderUtil; import org.apache.hugegraph.util.Log; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; + import com.google.common.collect.ImmutableList; public abstract class RocksDBStore extends AbstractBackendStore<RocksDBSessions.Session> { @@ -93,7 +93,8 @@ public abstract class RocksDBStore extends AbstractBackendStore<RocksDBSessions. private static final String TABLE_GENERAL_KEY = "general"; private static final String DB_OPEN = "db-open-%s"; - private static final long OPEN_TIMEOUT = 600L; + private static final long DB_OPEN_TIMEOUT = 600L; // unit s + private static final long DB_CLOSE_TIMEOUT = 30L; // unit s /* * This is threads number used to concurrently opening RocksDB dbs, * 8 is supposed enough due to configurable data disks and @@ -279,7 +280,7 @@ public abstract class RocksDBStore extends AbstractBackendStore<RocksDBSessions. this.useSessions(); try { Consumers.executeOncePerThread(openPool, OPEN_POOL_THREADS, - this::closeSessions); + this::closeSessions, DB_CLOSE_TIMEOUT); } catch (InterruptedException e) { throw new BackendException("Failed to close session opened by " + "open-pool"); @@ -288,7 +289,7 @@ public abstract class RocksDBStore extends AbstractBackendStore<RocksDBSessions. boolean terminated; openPool.shutdown(); try { - terminated = openPool.awaitTermination(OPEN_TIMEOUT, + terminated = openPool.awaitTermination(DB_OPEN_TIMEOUT, TimeUnit.SECONDS); } catch (Throwable e) { throw new BackendException(
