This is an automated email from the ASF dual-hosted git repository.
jin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
The following commit(s) were added to refs/heads/master by this push:
new 8db0a9b19 feat(core): support batch+parallel edges traverse (#2312)
8db0a9b19 is described below
commit 8db0a9b19532ba7d05303009e404602cc5048762
Author: Wu Chencan <[email protected]>
AuthorDate: Tue Oct 24 06:33:24 2023 -0500
feat(core): support batch+parallel edges traverse (#2312)
## Main Changes
- Enhance Consumers.java, supporting ExceptionHandle and `Future` to handle
InterruptedException when awaiting
- Add Nested Iterator Edge and support batch execution
- Support batch execution & thread parallel in KoutTraverser and Kneighbor
---
.../backend/query/EdgesQueryIterator.java | 64 ++++++
.../org/apache/hugegraph/task/TaskManager.java | 15 +-
.../traversal/algorithm/HugeTraverser.java | 45 +++++
.../traversal/algorithm/KneighborTraverser.java | 56 +++---
.../traversal/algorithm/KoutTraverser.java | 76 +++----
.../traversal/algorithm/OltpTraverser.java | 223 ++++++++++++++++++++-
.../algorithm/records/KneighborRecords.java | 14 +-
.../hugegraph/traversal/algorithm/steps/Steps.java | 4 +
.../java/org/apache/hugegraph/util/Consumers.java | 128 ++++++++----
.../backend/store/rocksdb/RocksDBStore.java | 13 +-
10 files changed, 516 insertions(+), 122 deletions(-)
diff --git
a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/query/EdgesQueryIterator.java
b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/query/EdgesQueryIterator.java
new file mode 100644
index 000000000..4ab9a8859
--- /dev/null
+++
b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/query/EdgesQueryIterator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.backend.query;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hugegraph.backend.id.Id;
+import org.apache.hugegraph.backend.tx.GraphTransaction;
+import org.apache.hugegraph.type.define.Directions;
+
+public class EdgesQueryIterator implements Iterator<Query> {
+
+ private final List<Id> labels;
+ private final Directions directions;
+ private final long limit;
+ private final Iterator<Id> sources;
+
+ public EdgesQueryIterator(Iterator<Id> sources,
+ Directions directions,
+ List<Id> labels,
+ long limit) {
+ this.sources = sources;
+ this.labels = labels;
+ this.directions = directions;
+ // Traverse NO_LIMIT ε Query.NO_LIMIT δΈε
+ this.limit = limit < 0 ? Query.NO_LIMIT : limit;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return sources.hasNext();
+ }
+
+ @Override
+ public Query next() {
+ Id sourceId = this.sources.next();
+ ConditionQuery query = GraphTransaction.constructEdgesQuery(sourceId,
+
this.directions,
+
this.labels);
+ if (this.limit != Query.NO_LIMIT) {
+ query.limit(this.limit);
+ query.capacity(this.limit);
+ } else {
+ query.capacity(Query.NO_CAPACITY);
+ }
+ return query;
+ }
+}
diff --git
a/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
b/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
index 524a1f759..0ad96f443 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
@@ -26,16 +26,17 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import org.apache.hugegraph.HugeException;
+import org.apache.hugegraph.HugeGraphParams;
+import org.apache.hugegraph.concurrent.PausableScheduledThreadPool;
import org.apache.hugegraph.type.define.NodeRole;
-import org.apache.hugegraph.util.*;
import org.apache.hugegraph.util.Consumers;
+import org.apache.hugegraph.util.E;
+import org.apache.hugegraph.util.ExecutorUtil;
import org.apache.hugegraph.util.LockUtil;
+import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;
-import org.apache.hugegraph.HugeException;
-import org.apache.hugegraph.HugeGraphParams;
-import org.apache.hugegraph.concurrent.PausableScheduledThreadPool;
-
public final class TaskManager {
private static final Logger LOG = Log.logger(TaskManager.class);
@@ -48,7 +49,7 @@ public final class TaskManager {
public static final String TASK_SCHEDULER = "task-scheduler-%d";
protected static final long SCHEDULE_PERIOD = 1000L; // unit ms
-
+ private static final long TX_CLOSE_TIMEOUT = 30L; // unit s
private static final int THREADS = 4;
private static final TaskManager MANAGER = new TaskManager(THREADS);
@@ -134,7 +135,7 @@ public final class TaskManager {
graph.closeTx();
} else {
Consumers.executeOncePerThread(this.taskExecutor, totalThreads,
- graph::closeTx);
+ graph::closeTx,
TX_CLOSE_TIMEOUT);
}
} catch (Exception e) {
throw new HugeException("Exception when closing task tx", e);
diff --git
a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java
b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java
index f5415d9c5..194576e85 100644
---
a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java
+++
b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/HugeTraverser.java
@@ -17,6 +17,8 @@
package org.apache.hugegraph.traversal.algorithm;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -37,6 +39,7 @@ import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.query.Aggregate;
import org.apache.hugegraph.backend.query.ConditionQuery;
+import org.apache.hugegraph.backend.query.EdgesQueryIterator;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.query.QueryResults;
import org.apache.hugegraph.backend.tx.GraphTransaction;
@@ -66,6 +69,7 @@ import org.apache.hugegraph.util.collection.CollectionFactory;
import org.apache.hugegraph.util.collection.ObjectIntMapping;
import org.apache.hugegraph.util.collection.ObjectIntMappingFactory;
import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableList;
@@ -465,6 +469,13 @@ public class HugeTraverser {
return edgeStep.skipSuperNodeIfNeeded(edges);
}
+ public EdgesIterator edgesOfVertices(Iterator<Id> sources,
+ Directions dir,
+ List<Id> labelIds,
+ long degree) {
+ return new EdgesIterator(new EdgesQueryIterator(sources, dir,
labelIds, degree));
+ }
+
public Iterator<Edge> edgesOfVertex(Id source, Steps steps) {
List<Id> edgeLabels = steps.edgeLabels();
ConditionQuery cq = GraphTransaction.constructEdgesQuery(
@@ -474,6 +485,11 @@ public class HugeTraverser {
cq.limit(steps.limit());
}
+ if (steps.isEdgeEmpty()) {
+ Iterator<Edge> edges = this.graph().edges(cq);
+ return edgesOfVertexStep(edges, steps);
+ }
+
Map<Id, ConditionQuery> edgeConditions =
getFilterQueryConditions(steps.edgeSteps(), HugeType.EDGE);
@@ -1004,4 +1020,33 @@ public class HugeTraverser {
return edges;
}
}
+
+ public class EdgesIterator implements Iterator<Iterator<Edge>>, Closeable {
+
+ private final Iterator<Iterator<Edge>> currentIter;
+
+ public EdgesIterator(EdgesQueryIterator queries) {
+ List<Iterator<Edge>> iteratorList = new ArrayList<>();
+ while (queries.hasNext()) {
+ Iterator<Edge> edges = graph.edges(queries.next());
+ iteratorList.add(edges);
+ }
+ this.currentIter = iteratorList.iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return this.currentIter.hasNext();
+ }
+
+ @Override
+ public Iterator<Edge> next() {
+ return this.currentIter.next();
+ }
+
+ @Override
+ public void close() throws IOException {
+ CloseableIterator.closeIterator(currentIter);
+ }
+ }
}
diff --git
a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java
b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java
index 9f16f480b..565d0af5f 100644
---
a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java
+++
b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KneighborTraverser.java
@@ -17,11 +17,11 @@
package org.apache.hugegraph.traversal.algorithm;
-import java.util.Iterator;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.hugegraph.HugeGraph;
+import org.apache.hugegraph.backend.id.EdgeId;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.structure.HugeEdge;
import org.apache.hugegraph.traversal.algorithm.records.KneighborRecords;
@@ -48,25 +48,27 @@ public class KneighborTraverser extends OltpTraverser {
Id labelId = this.getEdgeLabelId(label);
- Set<Id> latest = newSet();
- Set<Id> all = newSet();
+ KneighborRecords records = new KneighborRecords(true, sourceV, true);
- latest.add(sourceV);
- this.vertexIterCounter.addAndGet(1L);
+ Consumer<EdgeId> consumer = edgeId -> {
+ if (this.reachLimit(limit, records.size())) {
+ return;
+ }
+ records.addPath(edgeId.ownerVertexId(), edgeId.otherVertexId());
+ };
while (depth-- > 0) {
- long remaining = limit == NO_LIMIT ? NO_LIMIT : limit - all.size();
- latest = this.adjacentVertices(sourceV, latest, dir, labelId,
- all, degree, remaining);
- all.addAll(latest);
- this.vertexIterCounter.addAndGet(1L);
- this.edgeIterCounter.addAndGet(latest.size());
- if (reachLimit(limit, all.size())) {
+ records.startOneLayer(true);
+ traverseIdsByBfs(records.keys(), dir, labelId, degree, NO_LIMIT,
consumer);
+ records.finishOneLayer();
+ if (reachLimit(limit, records.size())) {
break;
}
}
- return all;
+ this.vertexIterCounter.addAndGet(records.size());
+
+ return records.idsBySet(limit);
}
public KneighborRecords customizedKneighbor(Id source, Steps steps,
@@ -76,33 +78,29 @@ public class KneighborTraverser extends OltpTraverser {
checkPositive(maxDepth, "k-neighbor max_depth");
checkLimit(limit);
- boolean concurrent = maxDepth >= this.concurrentDepth();
-
- KneighborRecords records = new KneighborRecords(concurrent,
+ KneighborRecords records = new KneighborRecords(true,
source, true);
- Consumer<Id> consumer = v -> {
+ Consumer<Edge> consumer = edge -> {
if (this.reachLimit(limit, records.size())) {
return;
}
- Iterator<Edge> edges = edgesOfVertex(v, steps);
- this.vertexIterCounter.addAndGet(1L);
- while (!this.reachLimit(limit, records.size()) && edges.hasNext())
{
- HugeEdge edge = (HugeEdge) edges.next();
- Id target = edge.id().otherVertexId();
- records.addPath(v, target);
-
- records.edgeResults().addEdge(v, target, edge);
-
- this.edgeIterCounter.addAndGet(1L);
- }
+ EdgeId edgeId = ((HugeEdge) edge).id();
+ records.addPath(edgeId.ownerVertexId(), edgeId.otherVertexId());
+ records.edgeResults().addEdge(edgeId.ownerVertexId(),
edgeId.otherVertexId(), edge);
};
while (maxDepth-- > 0) {
records.startOneLayer(true);
- traverseIds(records.keys(), consumer, concurrent);
+ traverseIdsByBfs(records.keys(), steps, NO_LIMIT, consumer);
records.finishOneLayer();
+ if (this.reachLimit(limit, records.size())) {
+ break;
+ }
}
+
+ this.vertexIterCounter.addAndGet(records.size());
+
return records;
}
diff --git
a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java
b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java
index 9924c766c..c683694c1 100644
---
a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java
+++
b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/KoutTraverser.java
@@ -18,12 +18,15 @@
package org.apache.hugegraph.traversal.algorithm;
import java.util.Iterator;
+import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.HugeGraph;
+import org.apache.hugegraph.backend.id.EdgeId;
import org.apache.hugegraph.backend.id.Id;
+import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.structure.HugeEdge;
import org.apache.hugegraph.traversal.algorithm.records.KoutRecords;
import org.apache.hugegraph.traversal.algorithm.steps.Steps;
@@ -57,34 +60,45 @@ public class KoutTraverser extends OltpTraverser {
Id labelId = this.getEdgeLabelId(label);
- Set<Id> latest = newIdSet();
- latest.add(sourceV);
+ Set<Id> sources = newIdSet();
+ Set<Id> neighbors = newIdSet();
+ Set<Id> visited = nearest ? newIdSet() : null;
- Set<Id> all = newIdSet();
- all.add(sourceV);
+ neighbors.add(sourceV);
+
+ ConcurrentVerticesConsumer consumer;
+
+ long remaining = capacity == NO_LIMIT ? NO_LIMIT : capacity - 1;
- long remaining = capacity == NO_LIMIT ?
- NO_LIMIT : capacity - latest.size();
- this.vertexIterCounter.addAndGet(1L);
while (depth-- > 0) {
// Just get limit nodes in last layer if limit < remaining capacity
if (depth == 0 && limit != NO_LIMIT &&
(limit < remaining || remaining == NO_LIMIT)) {
remaining = limit;
}
- if (nearest) {
- latest = this.adjacentVertices(sourceV, latest, dir, labelId,
- all, degree, remaining);
- all.addAll(latest);
- } else {
- latest = this.adjacentVertices(sourceV, latest, dir, labelId,
- null, degree, remaining);
+
+ if (visited != null) {
+ visited.addAll(neighbors);
}
- this.vertexIterCounter.addAndGet(1L);
- this.edgeIterCounter.addAndGet(latest.size());
+
+ // swap sources and neighbors
+ Set<Id> tmp = neighbors;
+ neighbors = sources;
+ sources = tmp;
+
+ // start
+ consumer = new ConcurrentVerticesConsumer(sourceV, visited,
remaining, neighbors);
+
+ this.vertexIterCounter.addAndGet(sources.size());
+ this.edgeIterCounter.addAndGet(neighbors.size());
+
+ traverseIdsByBfs(sources.iterator(), dir, labelId, degree,
capacity, consumer);
+
+ sources.clear();
+
if (capacity != NO_LIMIT) {
// Update 'remaining' value to record remaining capacity
- remaining -= latest.size();
+ remaining -= neighbors.size();
if (remaining <= 0 && depth > 0) {
throw new HugeException(
@@ -94,7 +108,7 @@ public class KoutTraverser extends OltpTraverser {
}
}
- return latest;
+ return neighbors;
}
public KoutRecords customizedKout(Id source, Steps steps,
@@ -107,33 +121,25 @@ public class KoutTraverser extends OltpTraverser {
checkLimit(limit);
long[] depth = new long[1];
depth[0] = maxDepth;
- boolean concurrent = maxDepth >= this.concurrentDepth();
- KoutRecords records = new KoutRecords(concurrent, source, nearest, 0);
+ KoutRecords records = new KoutRecords(true, source, nearest, 0);
- Consumer<Id> consumer = v -> {
+ Consumer<Edge> consumer = edge -> {
if (this.reachLimit(limit, depth[0], records.size())) {
return;
}
- Iterator<Edge> edges = edgesOfVertex(v, steps);
- this.vertexIterCounter.addAndGet(1L);
- while (!this.reachLimit(limit, depth[0], records.size()) &&
- edges.hasNext()) {
- HugeEdge edge = (HugeEdge) edges.next();
- Id target = edge.id().otherVertexId();
- records.addPath(v, target);
- this.checkCapacity(capacity, records.accessed(), depth[0]);
-
- records.edgeResults().addEdge(v, target, edge);
-
- this.edgeIterCounter.addAndGet(1L);
- }
+ EdgeId edgeId = ((HugeEdge) edge).id();
+ records.addPath(edgeId.ownerVertexId(), edgeId.otherVertexId());
+ records.edgeResults().addEdge(edgeId.ownerVertexId(),
edgeId.otherVertexId(), edge);
};
while (depth[0]-- > 0) {
+ List<Id> sources = records.ids(Query.NO_LIMIT);
records.startOneLayer(true);
- this.traverseIds(records.keys(), consumer, concurrent);
+ traverseIdsByBfs(sources.iterator(), steps, capacity, consumer);
+ this.vertexIterCounter.addAndGet(sources.size());
records.finishOneLayer();
+ checkCapacity(capacity, records.accessed(), depth[0]);
}
return records;
}
diff --git
a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java
b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java
index b05de2422..c05d8f89f 100644
---
a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java
+++
b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/OltpTraverser.java
@@ -17,24 +17,36 @@
package org.apache.hugegraph.traversal.algorithm;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
-import com.google.common.base.Objects;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hugegraph.HugeGraph;
+import org.apache.hugegraph.backend.id.EdgeId;
import org.apache.hugegraph.backend.id.Id;
+import org.apache.hugegraph.backend.query.EdgesQueryIterator;
import org.apache.hugegraph.config.CoreOptions;
+import org.apache.hugegraph.iterator.FilterIterator;
+import org.apache.hugegraph.iterator.MapperIterator;
+import org.apache.hugegraph.structure.HugeEdge;
+import org.apache.hugegraph.traversal.algorithm.steps.Steps;
+import org.apache.hugegraph.type.define.Directions;
import org.apache.hugegraph.util.Consumers;
+import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Property;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
-import org.apache.hugegraph.iterator.FilterIterator;
+import com.google.common.base.Objects;
public abstract class OltpTraverser extends HugeTraverser
implements AutoCloseable {
@@ -75,7 +87,7 @@ public abstract class OltpTraverser extends HugeTraverser
protected long traversePairs(Iterator<Pair<Id, Id>> pairs,
Consumer<Pair<Id, Id>> consumer) {
- return this.traverse(pairs, consumer, "traverse-pairs");
+ return this.traverseByOne(pairs, consumer, "traverse-pairs");
}
protected long traverseIds(Iterator<Id> ids, Consumer<Id> consumer,
@@ -93,18 +105,19 @@ public abstract class OltpTraverser extends HugeTraverser
}
protected long traverseIds(Iterator<Id> ids, Consumer<Id> consumer) {
- return this.traverse(ids, consumer, "traverse-ids");
+ return this.traverseByOne(ids, consumer, "traverse-ids");
}
- protected <K> long traverse(Iterator<K> iterator, Consumer<K> consumer,
- String name) {
+ protected <K> long traverseByOne(Iterator<K> iterator,
+ Consumer<K> consumer,
+ String taskName) {
if (!iterator.hasNext()) {
return 0L;
}
Consumers<K> consumers = new Consumers<>(executors.getExecutor(),
consumer, null);
- consumers.start(name);
+ consumers.start(taskName);
long total = 0L;
try {
while (iterator.hasNext()) {
@@ -129,11 +142,101 @@ public abstract class OltpTraverser extends HugeTraverser
return total;
}
+ protected void traverseIdsByBfs(Iterator<Id> vertices,
+ Directions dir,
+ Id label,
+ long degree,
+ long capacity,
+ Consumer<EdgeId> consumer) {
+ List<Id> labels = label == null ? Collections.emptyList() :
+ Collections.singletonList(label);
+ OneStepEdgeIterConsumer edgeIterConsumer = new
OneStepEdgeIterConsumer(consumer, capacity);
+
+ EdgesIterator edgeIter = edgesOfVertices(vertices, dir, labels,
degree);
+
+ // parallel out-of-order execution
+ this.traverseByBatch(edgeIter, edgeIterConsumer, "traverse-bfs-step",
1);
+ }
+
+ protected void traverseIdsByBfs(Iterator<Id> vertices,
+ Steps steps,
+ long capacity,
+ Consumer<Edge> consumer) {
+ StepsEdgeIterConsumer edgeIterConsumer =
+ new StepsEdgeIterConsumer(consumer, capacity, steps);
+
+ EdgesQueryIterator queryIterator = new EdgesQueryIterator(vertices,
+
steps.direction(),
+
steps.edgeLabels(),
+
steps.degree());
+
+ // get Iterator<Iterator<edges>> from Iterator<Query>
+ EdgesIterator edgeIter = new EdgesIterator(queryIterator);
+
+ // parallel out-of-order execution
+ this.traverseByBatch(edgeIter, edgeIterConsumer, "traverse-bfs-steps",
1);
+ }
+
+ protected <K> long traverseByBatch(Iterator<Iterator<K>> sources,
+ Consumer<Iterator<K>> consumer,
+ String taskName, int concurrentWorkers)
{
+ if (!sources.hasNext()) {
+ return 0L;
+ }
+ AtomicBoolean done = new AtomicBoolean(false);
+ Consumers<Iterator<K>> consumers = null;
+ try {
+ consumers = buildConsumers(consumer, concurrentWorkers, done,
+ executors.getExecutor());
+ return startConsumers(sources, taskName, done, consumers);
+ } finally {
+ assert consumers != null;
+ executors.returnExecutor(consumers.executor());
+ }
+ }
+
+ private <K> long startConsumers(Iterator<Iterator<K>> sources,
+ String taskName,
+ AtomicBoolean done,
+ Consumers<Iterator<K>> consumers) {
+ long total = 0L;
+ try {
+ consumers.start(taskName);
+ while (sources.hasNext() && !done.get()) {
+ total++;
+ Iterator<K> v = sources.next();
+ consumers.provide(v);
+ }
+ } catch (Consumers.StopExecution e) {
+ // pass
+ } catch (Throwable e) {
+ throw Consumers.wrapException(e);
+ } finally {
+ try {
+ consumers.await();
+ } catch (Throwable e) {
+ throw Consumers.wrapException(e);
+ } finally {
+ CloseableIterator.closeIterator(sources);
+ }
+ }
+ return total;
+ }
+
+ private <K> Consumers<Iterator<K>> buildConsumers(Consumer<Iterator<K>>
consumer,
+ int queueSizePerWorker,
+ AtomicBoolean done,
+ ExecutorService
executor) {
+ return new Consumers<>(executor,
+ consumer,
+ null,
+ e -> done.set(true),
+ queueSizePerWorker);
+ }
+
protected Iterator<Vertex> filter(Iterator<Vertex> vertices,
String key, Object value) {
- return new FilterIterator<>(vertices, vertex -> {
- return match(vertex, key, value);
- });
+ return new FilterIterator<>(vertices, vertex -> match(vertex, key,
value));
}
protected boolean match(Element elem, String key, Object value) {
@@ -175,4 +278,104 @@ public abstract class OltpTraverser extends HugeTraverser
return values;
}
}
+
+ public static class ConcurrentVerticesConsumer implements Consumer<EdgeId>
{
+
+ private final Id sourceV;
+ private final Set<Id> excluded;
+ private final Set<Id> neighbors;
+ private final long limit;
+ private final AtomicInteger count;
+
+ public ConcurrentVerticesConsumer(Id sourceV, Set<Id> excluded, long
limit,
+ Set<Id> neighbors) {
+ this.sourceV = sourceV;
+ this.excluded = excluded;
+ this.limit = limit;
+ this.neighbors = neighbors;
+ this.count = new AtomicInteger(0);
+ }
+
+ @Override
+ public void accept(EdgeId edgeId) {
+ if (this.limit != NO_LIMIT && count.get() >= this.limit) {
+ throw new Consumers.StopExecution("reach limit");
+ }
+
+ Id targetV = edgeId.otherVertexId();
+ if (this.sourceV.equals(targetV)) {
+ return;
+ }
+
+ if (this.excluded != null && this.excluded.contains(targetV)) {
+ return;
+ }
+
+ if (this.neighbors.add(targetV)) {
+ if (this.limit != NO_LIMIT) {
+ this.count.getAndIncrement();
+ }
+ }
+ }
+ }
+
+ public abstract class EdgesConsumer<T, E> implements Consumer<Iterator<T>>
{
+
+ private final Consumer<E> consumer;
+ private final long capacity;
+
+ public EdgesConsumer(Consumer<E> consumer, long capacity) {
+ this.consumer = consumer;
+ this.capacity = capacity;
+ }
+
+ protected abstract Iterator<E> prepare(Iterator<T> iter);
+
+ @Override
+ public void accept(Iterator<T> edgeIter) {
+ Iterator<E> ids = prepare(edgeIter);
+ long counter = 0;
+ while (ids.hasNext()) {
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.warn("Consumer is Interrupted");
+ break;
+ }
+ counter++;
+ this.consumer.accept(ids.next());
+ }
+ long total = edgeIterCounter.addAndGet(counter);
+ // traverse by batch & improve performance
+ if (this.capacity != NO_LIMIT && total >= this.capacity) {
+ throw new Consumers.StopExecution("reach capacity");
+ }
+ }
+ }
+
+ public class OneStepEdgeIterConsumer extends EdgesConsumer<Edge, EdgeId> {
+
+ public OneStepEdgeIterConsumer(Consumer<EdgeId> consumer, long
capacity) {
+ super(consumer, capacity);
+ }
+
+ @Override
+ protected Iterator<EdgeId> prepare(Iterator<Edge> edgeIter) {
+ return new MapperIterator<>(edgeIter, (e) -> ((HugeEdge) e).id());
+ }
+ }
+
+ public class StepsEdgeIterConsumer extends EdgesConsumer<Edge, Edge> {
+
+ private final Steps steps;
+
+ public StepsEdgeIterConsumer(Consumer<Edge> consumer, long capacity,
+ Steps steps) {
+ super(consumer, capacity);
+ this.steps = steps;
+ }
+
+ @Override
+ protected Iterator<Edge> prepare(Iterator<Edge> edgeIter) {
+ return edgesOfVertexStep(edgeIter, this.steps);
+ }
+ }
}
diff --git
a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java
b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java
index 7e04a286c..649b1c211 100644
---
a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java
+++
b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/records/KneighborRecords.java
@@ -19,7 +19,9 @@ package org.apache.hugegraph.traversal.algorithm.records;
import static org.apache.hugegraph.traversal.algorithm.HugeTraverser.NO_LIMIT;
+import java.util.Collection;
import java.util.List;
+import java.util.Set;
import java.util.Stack;
import org.apache.hugegraph.backend.id.Id;
@@ -45,6 +47,17 @@ public class KneighborRecords extends
SingleWayMultiPathsRecords {
@Override
public List<Id> ids(long limit) {
List<Id> ids = CollectionFactory.newList(CollectionType.EC);
+ this.getRecords(limit, ids);
+ return ids;
+ }
+
+ public Set<Id> idsBySet(long limit) {
+ Set<Id> ids = CollectionFactory.newSet(CollectionType.EC);
+ this.getRecords(limit, ids);
+ return ids;
+ }
+
+ private void getRecords(long limit, Collection<Id> ids) {
Stack<Record> records = this.records();
// Not include record(i=0) to ignore source vertex
for (int i = 1; i < records.size(); i++) {
@@ -54,7 +67,6 @@ public class KneighborRecords extends
SingleWayMultiPathsRecords {
limit--;
}
}
- return ids;
}
@Override
diff --git
a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/steps/Steps.java
b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/steps/Steps.java
index d1a9238be..c2a1a7e1e 100644
---
a/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/steps/Steps.java
+++
b/hugegraph-core/src/main/java/org/apache/hugegraph/traversal/algorithm/steps/Steps.java
@@ -138,6 +138,10 @@ public class Steps {
return new ArrayList<>(this.edgeSteps.keySet());
}
+ public boolean isEdgeEmpty() {
+ return this.edgeSteps.isEmpty();
+ }
+
public boolean isVertexEmpty() {
return this.vertexSteps.isEmpty();
}
diff --git
a/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java
b/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java
index 00689e0c5..06e678fd9 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/util/Consumers.java
@@ -27,16 +27,16 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
-import org.apache.hugegraph.config.CoreOptions;
-import org.slf4j.Logger;
-
import org.apache.hugegraph.HugeException;
+import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.task.TaskManager.ContextCallable;
+import org.slf4j.Logger;
public final class Consumers<V> {
@@ -46,16 +46,16 @@ public final class Consumers<V> {
private static final Logger LOG = Log.logger(Consumers.class);
+ private final V QUEUE_END = (V) new Object();
private final ExecutorService executor;
private final Consumer<V> consumer;
- private final Runnable done;
-
+ private final Runnable doneHandle;
+ private final Consumer<Throwable> exceptionHandle;
private final int workers;
+ private final List<Future> runningFutures;
private final int queueSize;
private final CountDownLatch latch;
private final BlockingQueue<V> queue;
-
- private volatile boolean ending = false;
private volatile Throwable exception = null;
public Consumers(ExecutorService executor, Consumer<V> consumer) {
@@ -63,23 +63,40 @@ public final class Consumers<V> {
}
public Consumers(ExecutorService executor,
- Consumer<V> consumer, Runnable done) {
+ Consumer<V> consumer, Runnable doneHandle) {
+ this(executor, consumer, doneHandle, QUEUE_WORKER_SIZE);
+ }
+
+ public Consumers(ExecutorService executor,
+ Consumer<V> consumer,
+ Runnable doneHandle,
+ int queueSizePerWorker) {
+ this(executor, consumer, doneHandle, null, queueSizePerWorker);
+ }
+
+ public Consumers(ExecutorService executor,
+ Consumer<V> consumer,
+ Runnable doneHandle,
+ Consumer<Throwable> exceptionHandle,
+ int queueSizePerWorker) {
this.executor = executor;
this.consumer = consumer;
- this.done = done;
+ this.doneHandle = doneHandle;
+ this.exceptionHandle = exceptionHandle;
int workers = THREADS;
if (this.executor instanceof ThreadPoolExecutor) {
workers = ((ThreadPoolExecutor) this.executor).getCorePoolSize();
}
this.workers = workers;
- this.queueSize = QUEUE_WORKER_SIZE * workers;
+
+ this.runningFutures = new ArrayList<>(workers);
+ this.queueSize = queueSizePerWorker * workers + 1;
this.latch = new CountDownLatch(workers);
this.queue = new ArrayBlockingQueue<>(this.queueSize);
}
public void start(String name) {
- this.ending = false;
this.exception = null;
if (this.executor == null) {
return;
@@ -87,7 +104,8 @@ public final class Consumers<V> {
LOG.info("Starting {} workers[{}] with queue size {}...",
this.workers, name, this.queueSize);
for (int i = 0; i < this.workers; i++) {
- this.executor.submit(new ContextCallable<>(this::runAndDone));
+ this.runningFutures.add(
+ this.executor.submit(new
ContextCallable<>(this::runAndDone)));
}
}
@@ -95,11 +113,15 @@ public final class Consumers<V> {
try {
this.run();
} catch (Throwable e) {
- // Only the first exception of one thread can be stored
- this.exception = e;
- if (!(e instanceof StopExecution)) {
+ if (e instanceof StopExecution) {
+ this.queue.clear();
+ putQueueEnd();
+ } else {
+ // Only the first exception to one thread can be stored
+ this.exception = e;
LOG.error("Error when running task", e);
}
+ exceptionHandle(e);
} finally {
this.done();
this.latch.countDown();
@@ -109,11 +131,7 @@ public final class Consumers<V> {
private void run() {
LOG.debug("Start to work...");
- while (!this.ending) {
- this.consume();
- }
- assert this.ending;
- while (this.consume()){
+ while (this.consume()) {
// ignore
}
@@ -121,14 +139,18 @@ public final class Consumers<V> {
}
private boolean consume() {
- V elem;
- try {
- elem = this.queue.poll(CONSUMER_WAKE_PERIOD,
TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- // ignore
- return true;
+ V elem = null;
+ while (elem == null) {
+ try {
+ elem = this.queue.poll(CONSUMER_WAKE_PERIOD,
TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // ignore
+ return false;
+ }
}
- if (elem == null) {
+
+ if (elem == QUEUE_END) {
+ putQueueEnd();
return false;
}
// do job
@@ -136,13 +158,29 @@ public final class Consumers<V> {
return true;
}
+ private void exceptionHandle(Throwable e) {
+ if (this.exceptionHandle == null) {
+ return;
+ }
+
+ try {
+ this.exceptionHandle.accept(e);
+ } catch (Throwable ex) {
+ if (this.exception == null) {
+ this.exception = ex;
+ } else {
+ LOG.warn("Error while calling exceptionHandle()", ex);
+ }
+ }
+ }
+
private void done() {
- if (this.done == null) {
+ if (this.doneHandle == null) {
return;
}
try {
- this.done.run();
+ this.doneHandle.run();
} catch (Throwable e) {
if (this.exception == null) {
this.exception = e;
@@ -169,6 +207,16 @@ public final class Consumers<V> {
} else {
try {
this.queue.put(v);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupt while queuing QUEUE_END", e);
+ }
+ }
+ }
+
+ private void putQueueEnd() {
+ if (this.executor != null) {
+ try {
+ this.queue.put(QUEUE_END);
} catch (InterruptedException e) {
LOG.warn("Interrupted while enqueue", e);
}
@@ -176,15 +224,18 @@ public final class Consumers<V> {
}
public void await() throws Throwable {
- this.ending = true;
if (this.executor == null) {
// call done() directly if without thread pool
this.done();
} else {
try {
+ putQueueEnd();
this.latch.await();
} catch (InterruptedException e) {
String error = "Interrupted while waiting for consumers";
+ for (Future f : this.runningFutures) {
+ f.cancel(true);
+ }
this.exception = new HugeException(error, e);
LOG.warn(error, e);
}
@@ -201,7 +252,8 @@ public final class Consumers<V> {
public static void executeOncePerThread(ExecutorService executor,
int totalThreads,
- Runnable callback)
+ Runnable callback,
+ long invokeTimeout)
throws InterruptedException {
// Ensure callback execute at least once for every thread
final Map<Thread, Integer> threadsTimes = new ConcurrentHashMap<>();
@@ -230,7 +282,7 @@ public final class Consumers<V> {
for (int i = 0; i < totalThreads; i++) {
tasks.add(task);
}
- executor.invokeAll(tasks);
+ executor.invokeAll(tasks, invokeTimeout, TimeUnit.SECONDS);
}
public static ExecutorService newThreadPool(String prefix, int workers) {
@@ -290,13 +342,21 @@ public final class Consumers<V> {
public synchronized void returnExecutor(ExecutorService executor) {
E.checkNotNull(executor, "executor");
if (!this.executors.offer(executor)) {
- executor.shutdown();
+ try {
+ executor.shutdown();
+ } catch (Exception e) {
+ LOG.warn("close ExecutorService with error:", e);
+ }
}
}
public synchronized void destroy() {
for (ExecutorService executor : this.executors) {
- executor.shutdown();
+ try {
+ executor.shutdownNow();
+ } catch (Exception e) {
+ LOG.warn("close ExecutorService with error:", e);
+ }
}
this.executors.clear();
}
diff --git
a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java
b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java
index 2dba5fa76..283baa622 100644
---
a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java
+++
b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java
@@ -44,9 +44,6 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
-import org.rocksdb.RocksDBException;
-import org.slf4j.Logger;
-
import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.id.Id;
@@ -69,6 +66,9 @@ import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.ExecutorUtil;
import org.apache.hugegraph.util.InsertionOrderUtil;
import org.apache.hugegraph.util.Log;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+
import com.google.common.collect.ImmutableList;
public abstract class RocksDBStore extends
AbstractBackendStore<RocksDBSessions.Session> {
@@ -93,7 +93,8 @@ public abstract class RocksDBStore extends
AbstractBackendStore<RocksDBSessions.
private static final String TABLE_GENERAL_KEY = "general";
private static final String DB_OPEN = "db-open-%s";
- private static final long OPEN_TIMEOUT = 600L;
+ private static final long DB_OPEN_TIMEOUT = 600L; // unit s
+ private static final long DB_CLOSE_TIMEOUT = 30L; // unit s
/*
* This is threads number used to concurrently opening RocksDB dbs,
* 8 is supposed enough due to configurable data disks and
@@ -279,7 +280,7 @@ public abstract class RocksDBStore extends
AbstractBackendStore<RocksDBSessions.
this.useSessions();
try {
Consumers.executeOncePerThread(openPool, OPEN_POOL_THREADS,
- this::closeSessions);
+ this::closeSessions,
DB_CLOSE_TIMEOUT);
} catch (InterruptedException e) {
throw new BackendException("Failed to close session opened by " +
"open-pool");
@@ -288,7 +289,7 @@ public abstract class RocksDBStore extends
AbstractBackendStore<RocksDBSessions.
boolean terminated;
openPool.shutdown();
try {
- terminated = openPool.awaitTermination(OPEN_TIMEOUT,
+ terminated = openPool.awaitTermination(DB_OPEN_TIMEOUT,
TimeUnit.SECONDS);
} catch (Throwable e) {
throw new BackendException(