This is an automated email from the ASF dual-hosted git repository. yangjiaqi pushed a commit to branch pd-store-jacky in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit 65c497927d476e299978158340a5e945450ca726 Author: JackyYangPassion <[email protected]> AuthorDate: Fri Mar 15 14:45:45 2024 +0800 add comment --- .../src/assembly/static/bin/hugegraph-server.sh | 2 +- .../hugegraph/backend/store/hstore/HstoreTable.java | 4 ++-- .../java/org/apache/hugegraph/store/HgKvStore.java | 4 ++++ .../hugegraph/store/client/NodeTxSessionProxy.java | 4 ++-- .../java/org/apache/hugegraph/store/HgStoreEngine.java | 6 +++--- .../org/apache/hugegraph/store/PartitionEngine.java | 2 +- .../hugegraph/store/business/BusinessHandlerImpl.java | 2 +- .../src/assembly/static/bin/start-hugegraph-store.sh | 2 +- .../hugegraph/store/node/controller/PartitionAPI.java | 2 +- .../hugegraph/store/node/grpc/HgStoreNodeService.java | 2 +- .../hugegraph/store/node/grpc/HgStoreSessionImpl.java | 18 ++++++++++++++---- .../apache/hugegraph/store/boot/StoreNodeServer00.java | 9 +++++++++ .../hugegraph/rocksdb/access/SessionOperatorImpl.java | 4 ++-- 13 files changed, 42 insertions(+), 19 deletions(-) diff --git a/hugegraph-server/hugegraph-dist/src/assembly/static/bin/hugegraph-server.sh b/hugegraph-server/hugegraph-dist/src/assembly/static/bin/hugegraph-server.sh index 6777939a7..e05c33eea 100644 --- a/hugegraph-server/hugegraph-dist/src/assembly/static/bin/hugegraph-server.sh +++ b/hugegraph-server/hugegraph-dist/src/assembly/static/bin/hugegraph-server.sh @@ -151,7 +151,7 @@ esac JVM_OPTIONS="-Dlog4j.configurationFile=${CONF}/log4j2.xml" if [[ ${OPEN_SECURITY_CHECK} == "true" ]]; then - JVM_OPTIONS="${JVM_OPTIONS} -Djava.security.manager=org.apache.hugegraph.security.HugeSecurityManager" + JVM_OPTIONS="${JVM_OPTIONS} -Xdebug -Xrunjdwp:transport=dt_socket,address=8001,server=y,suspend=n -Djava.security.manager=org.apache.hugegraph.security.HugeSecurityManager" fi # Turn on security check diff --git a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreTable.java b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreTable.java index 39e24a1d9..72841e4f9 100755 --- a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreTable.java +++ b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreTable.java @@ -187,7 +187,7 @@ public class HstoreTable extends BackendTable<Session, BackendEntry> { } public byte[] getInsertOwner(BackendEntry entry) { - // 为适应label索引散列,不聚焦在一个分区 + // 为适应label索引散列,不聚焦在一个分区:TODO:此处逻辑没有发现有实质性作用 if (entry.type().isLabelIndex() && (entry.columns().size() == 1)) { Iterator<BackendColumn> iterator = entry.columns().iterator(); while (iterator.hasNext()) { @@ -235,7 +235,7 @@ public class HstoreTable extends BackendTable<Session, BackendEntry> { @Override public void insert(Session session, BackendEntry entry) { - byte[] owner = entry.type().isEdge() ? getInsertEdgeOwner(entry) : getInsertOwner(entry); + byte[] owner = entry.type().isEdge() ? getInsertEdgeOwner(entry) : getInsertOwner(entry);//点边 OwnerKey 定义 ArrayList<BackendColumn> columns = new ArrayList<>(entry.columns()); for (int i = 0; i < columns.size(); i++) { BackendColumn col = columns.get(i); diff --git a/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/HgKvStore.java b/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/HgKvStore.java index df29ad56f..ea6c425c7 100644 --- a/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/HgKvStore.java +++ b/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/HgKvStore.java @@ -44,6 +44,10 @@ public interface HgKvStore { /** * 该版本被store内部使用。向分区写入数据, * partitionId与key.keyCode必须与pd存储的分区信息保持一致。 + * + * 在直接写入过程中发现相同的Key 写到了不同的partition,造成数据重复 + * + * */ boolean directPut(String table, int partitionId, HgOwnerKey key, byte[] value); diff --git a/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/client/NodeTxSessionProxy.java b/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/client/NodeTxSessionProxy.java index eccc25509..5dc7cbac0 100644 --- a/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/client/NodeTxSessionProxy.java +++ b/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/client/NodeTxSessionProxy.java @@ -665,13 +665,13 @@ class NodeTxSessionProxy implements HgStoreSession { public boolean doAction(String table, HgOwnerKey startKey, HgOwnerKey endKey, Function<NodeTkv, Boolean> action) { Collection<HgNodePartition> partitions = - doPartition(table, startKey.getOwner(), endKey.getOwner()); + doPartition(table, startKey.getOwner(), endKey.getOwner());// 此处partition_id 不同链接获取的不同 for (HgNodePartition partition : partitions) { HgStoreNode storeNode = this.getStoreNode(partition.getNodeId()); HgStoreSession session = this.txExecutor.openNodeSession(storeNode); NodeTkv data = new NodeTkv(partition, table, startKey, endKey); data.setSession(session); - if (!action.apply(data)) { + if (!action.apply(data)) {//此处action.apply 回调函数使用的是那个(:根据调用堆栈 看传入的回调函数) return false; } } diff --git a/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/HgStoreEngine.java b/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/HgStoreEngine.java index ff2a62862..57fccbfa9 100644 --- a/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/HgStoreEngine.java +++ b/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/HgStoreEngine.java @@ -73,7 +73,7 @@ public class HgStoreEngine implements Lifecycle<HgStoreEngineOptions>, HgStoreSt private RpcServer rpcServer; private HgStoreEngineOptions options; private PdProvider pdProvider; - private HgCmdClient hgCmdClient; + private HgCmdClient hgCmdClient;//通过 HgCmdClient 调用 RPC 创建 Raft peers private PartitionManager partitionManager; private HeartbeatService heartbeatService; private BusinessHandler businessHandler; @@ -313,7 +313,7 @@ public class HgStoreEngine implements Lifecycle<HgStoreEngineOptions>, HgStoreSt if ((engine = partitionEngines.get(groupId)) == null) { log.info("createPartitionEngine {}, with shards: {}", groupId, shardGroup); - + //创建 PartitionEngine engine = new PartitionEngine(this, shardGroup); PartitionEngineOptions ptOpts = new PartitionEngineOptions(); if (conf != null) { @@ -347,7 +347,7 @@ public class HgStoreEngine implements Lifecycle<HgStoreEngineOptions>, HgStoreSt * 1、遍历partition.shards * 2、根据storeId获取Store信息 * 3、建立向其他store的raft rpc,发送StartRaft消息 - * + * 通过hgCmdClient * @param partition * @return */ diff --git a/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/PartitionEngine.java b/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/PartitionEngine.java index bae312912..55556b0d2 100644 --- a/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/PartitionEngine.java +++ b/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/PartitionEngine.java @@ -440,7 +440,7 @@ public class PartitionEngine implements Lifecycle<PartitionEngineOptions>, RaftS final Task task = new Task(); task.setData(ByteBuffer.wrap(operation.getValues())); task.setDone(new HgStoreStateMachine.RaftClosureAdapter(operation, closure)); - this.raftNode.apply(task); + this.raftNode.apply(task);//此处还是回调函数:如何写到RocksDB 的? } @Override diff --git a/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/business/BusinessHandlerImpl.java b/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/business/BusinessHandlerImpl.java index 2c9ba52ef..b5e1c3b70 100644 --- a/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/business/BusinessHandlerImpl.java +++ b/hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/business/BusinessHandlerImpl.java @@ -73,7 +73,7 @@ import lombok.extern.slf4j.Slf4j; public class BusinessHandlerImpl implements BusinessHandler { private static final int batchSize = 10000; - private static final RocksDBFactory factory = RocksDBFactory.getInstance(); + private static final RocksDBFactory factory = RocksDBFactory.getInstance();//RocksDB 实例创建 private static final HashMap<ScanType, String> tableMapping = new HashMap<>() {{ put(ScanType.SCAN_VERTEX, tableVertex); put(ScanType.SCAN_EDGE, tableOutEdge); diff --git a/hugegraph-store/hg-store-dist/src/assembly/static/bin/start-hugegraph-store.sh b/hugegraph-store/hg-store-dist/src/assembly/static/bin/start-hugegraph-store.sh index 0b069a4c0..f33f912e3 100644 --- a/hugegraph-store/hg-store-dist/src/assembly/static/bin/start-hugegraph-store.sh +++ b/hugegraph-store/hg-store-dist/src/assembly/static/bin/start-hugegraph-store.sh @@ -144,7 +144,7 @@ case "$GC_OPTION" in exit 1 esac -JVM_OPTIONS="-Dlog4j.configurationFile=${CONF}/log4j2.xml -Dfastjson.parser.safeMode=true" +JVM_OPTIONS="-Dlog4j.configurationFile=${CONF}/log4j2.xml -Dfastjson.parser.safeMode=true -Xdebug -Xrunjdwp:transport=dt_socket,address=8002,server=y,suspend=n" #if [ "${JMX_EXPORT_PORT}" != "" ] && [ ${JMX_EXPORT_PORT} -ne 0 ] ; then # JAVA_OPTIONS="${JAVA_OPTIONS} -javaagent:${LIB}/jmx_prometheus_javaagent-0.16.1.jar=${JMX_EXPORT_PORT}:${CONF}/jmx_exporter.yml" #fi diff --git a/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/controller/PartitionAPI.java b/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/controller/PartitionAPI.java index 3b12dbf38..4d50f647b 100644 --- a/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/controller/PartitionAPI.java +++ b/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/controller/PartitionAPI.java @@ -104,7 +104,7 @@ public class PartitionAPI { partition.setLeader(pt.isLeader() == engine.isLeader() ? "OK" : "Error"); raft.getPartitions().add(partition); } - rafts.add(raft); + rafts.add(raft);//组装信息 } return okMap("partitions", rafts); diff --git a/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/grpc/HgStoreNodeService.java b/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/grpc/HgStoreNodeService.java index 86239bf3d..1c53e6bdb 100644 --- a/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/grpc/HgStoreNodeService.java +++ b/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/grpc/HgStoreNodeService.java @@ -65,7 +65,7 @@ public class HgStoreNodeService implements RaftTaskHandler { public static final byte MAX_OP = 0x59; private final AppConfig appConfig; @Autowired - HgStoreSessionImpl hgStoreSession; + HgStoreSessionImpl hgStoreSession;//HgStoreSessionImpl 就是service HgStoreSession 具体实现 private HgStoreEngine storeEngine; public HgStoreNodeService(@Autowired AppConfig appConfig) { diff --git a/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/grpc/HgStoreSessionImpl.java b/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/grpc/HgStoreSessionImpl.java index aff054917..0348d6568 100644 --- a/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/grpc/HgStoreSessionImpl.java +++ b/hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/grpc/HgStoreSessionImpl.java @@ -62,13 +62,23 @@ import com.google.protobuf.ByteString; import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; +/** + * + * 1. HgStoreSession 此处主要是实现: + * PB 文件 store_session.proto: + * 对应service定义 :HgStoreSession + * + * Sever 端实现逻辑 + * + * + */ @Slf4j @GRpcService public class HgStoreSessionImpl extends HgStoreSessionGrpc.HgStoreSessionImplBase { @Autowired() private AppConfig appConfig; @Autowired - private HgStoreNodeService storeService; + private HgStoreNodeService storeService;// 具体RocksDB 实例,进行读写操作 private HgStoreWrapperEx wrapper; private PdProvider pdProvider; @@ -210,7 +220,7 @@ public class HgStoreSessionImpl extends HgStoreSessionGrpc.HgStoreSessionImplBas @Override public void batch(BatchReq request, StreamObserver<FeedbackRes> observer) { String graph = request.getHeader().getGraph(); - List<BatchEntry> list = request.getWriteReq().getEntryList(); + List<BatchEntry> list = request.getWriteReq().getEntryList();//此处BatchEntry 在PB中有结构定义 PdProvider pd = getPD(); try { GraphManager graphManager = pd.getGraphManager(); @@ -261,7 +271,7 @@ public class HgStoreSessionImpl extends HgStoreSessionGrpc.HgStoreSessionImplBas return; } - // 按分区拆分数据 + // 按分区拆分数据:根据partition_id 将BatchEntry 分组 Map<Integer, List<BatchEntry>> groups = new HashMap<>(); list.forEach((entry) -> { Key startKey = entry.getStartKey(); @@ -287,7 +297,7 @@ public class HgStoreSessionImpl extends HgStoreSessionGrpc.HgStoreSessionImplBas } }); - // 发给不同的raft执行 + // 发给不同的raft执行:此处重点关注下storeService BatchGrpcClosure<FeedbackRes> closure = new BatchGrpcClosure<>(groups.size()); groups.forEach((partition, entries) -> { diff --git a/hugegraph-store/hg-store-node/src/test/java/org/apache/hugegraph/store/boot/StoreNodeServer00.java b/hugegraph-store/hg-store-node/src/test/java/org/apache/hugegraph/store/boot/StoreNodeServer00.java index dc860826d..fd70b3f9f 100644 --- a/hugegraph-store/hg-store-node/src/test/java/org/apache/hugegraph/store/boot/StoreNodeServer00.java +++ b/hugegraph-store/hg-store-node/src/test/java/org/apache/hugegraph/store/boot/StoreNodeServer00.java @@ -25,6 +25,15 @@ import org.springframework.boot.SpringApplication; import com.alipay.remoting.util.StringUtils; +/** + * + * TODO: Spring boot 组织服务模式是什么? + * Service? + * + * SpringBoot + gRPC 如何组织服务 + * + * + */ public class StoreNodeServer00 { public static void main(String[] args) { diff --git a/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/SessionOperatorImpl.java b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/SessionOperatorImpl.java index 45d171404..94fa265a9 100644 --- a/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/SessionOperatorImpl.java +++ b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/SessionOperatorImpl.java @@ -38,9 +38,9 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class SessionOperatorImpl implements SessionOperator { - private final RocksDB db; + private final RocksDB db;// GET/SCAN 使用此对象接口查询 private final RocksDBSession session; - private WriteBatch batch; + private WriteBatch batch;//TODO: 没明白如何链接后执行put public SessionOperatorImpl(RocksDBSession session) { this.session = session;
