This is an automated email from the ASF dual-hosted git repository. vgalaxies pushed a commit to branch intro-pd-core in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit d40b270dc3479662e20c92a45ff37f5e4ece84fc Author: VGalaxies <[email protected]> AuthorDate: Thu Mar 14 14:06:59 2024 +0800 format hg-pd-core --- hugegraph-pd/hg-pd-core/pom.xml | 2 +- .../src/main/java/org/apache/hugegraph/pd/ConfigService.java | 1 - .../src/main/java/org/apache/hugegraph/pd/KvService.java | 1 - .../org/apache/hugegraph/pd/PartitionInstructionListener.java | 1 + .../main/java/org/apache/hugegraph/pd/PartitionService.java | 1 - .../java/org/apache/hugegraph/pd/PartitionStatusListener.java | 1 + .../src/main/java/org/apache/hugegraph/pd/RegistryService.java | 1 + .../java/org/apache/hugegraph/pd/ShardGroupStatusListener.java | 1 + .../java/org/apache/hugegraph/pd/StoreMonitorDataService.java | 3 +-- .../main/java/org/apache/hugegraph/pd/StoreNodeService.java | 1 - .../main/java/org/apache/hugegraph/pd/TaskScheduleService.java | 10 +--------- .../src/main/java/org/apache/hugegraph/pd/config/PDConfig.java | 8 ++++++-- .../java/org/apache/hugegraph/pd/meta/ConfigMetaStore.java | 2 -- .../main/java/org/apache/hugegraph/pd/meta/IdMetaStore.java | 1 - .../java/org/apache/hugegraph/pd/meta/MetadataFactory.java | 1 - .../java/org/apache/hugegraph/pd/meta/MetadataKeyHelper.java | 1 + .../java/org/apache/hugegraph/pd/meta/MetadataStoreBase.java | 1 - .../main/java/org/apache/hugegraph/pd/meta/PartitionMeta.java | 2 +- .../src/main/java/org/apache/hugegraph/pd/meta/QueueStore.java | 1 + .../main/java/org/apache/hugegraph/pd/meta/StoreInfoMeta.java | 1 + .../main/java/org/apache/hugegraph/pd/meta/TaskInfoMeta.java | 1 + .../org/apache/hugegraph/pd/raft/FutureClosureAdapter.java | 1 + .../src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java | 1 + .../main/java/org/apache/hugegraph/pd/raft/RaftRpcClient.java | 1 + .../java/org/apache/hugegraph/pd/raft/RaftRpcProcessor.java | 5 ++++- .../java/org/apache/hugegraph/pd/raft/RaftStateListener.java | 1 + .../java/org/apache/hugegraph/pd/raft/RaftStateMachine.java | 5 ++--- .../java/org/apache/hugegraph/pd/raft/RaftTaskHandler.java | 1 + .../java/org/apache/hugegraph/pd/store/BaseKVStoreClosure.java | 2 +- .../src/main/java/org/apache/hugegraph/pd/store/HgKVStore.java | 1 + .../main/java/org/apache/hugegraph/pd/store/HgKVStoreImpl.java | 2 +- .../src/main/java/org/apache/hugegraph/pd/store/KV.java | 2 +- .../main/java/org/apache/hugegraph/pd/store/RaftKVStore.java | 2 -- 33 files changed, 33 insertions(+), 33 deletions(-) diff --git a/hugegraph-pd/hg-pd-core/pom.xml b/hugegraph-pd/hg-pd-core/pom.xml index e59b5ac35..1f23259d2 100644 --- a/hugegraph-pd/hg-pd-core/pom.xml +++ b/hugegraph-pd/hg-pd-core/pom.xml @@ -37,7 +37,7 @@ <dependency> <groupId>com.alipay.sofa</groupId> <artifactId>jraft-core</artifactId> - <!-- TODO: use open source version & adopt the code later --> + <!-- TODO: use open source version & adopt the code later --> <version>1.3.13</version> <exclusions> <exclusion> diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/ConfigService.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/ConfigService.java index 2557745c8..cc28c1b0a 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/ConfigService.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/ConfigService.java @@ -41,7 +41,6 @@ public class ConfigService implements RaftStateListener { meta = MetadataFactory.newConfigMeta(config); } - public Metapb.PDConfig getPDConfig(long version) throws PDException { return this.meta.getPdConfig(version); } diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/KvService.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/KvService.java index e85cfcb1e..f31196f81 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/KvService.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/KvService.java @@ -43,7 +43,6 @@ import lombok.extern.slf4j.Slf4j; @Service public class KvService { - public static final char KV_DELIMITER = '@'; // TODO 主前缀之后,增加类名做区分 private static final String TTL_PREFIX = "T"; diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionInstructionListener.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionInstructionListener.java index 2188f6ca7..2b1e4a637 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionInstructionListener.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionInstructionListener.java @@ -31,6 +31,7 @@ import org.apache.hugegraph.pd.grpc.pulse.TransferLeader; * 分区命令监听 */ public interface PartitionInstructionListener { + void changeShard(Metapb.Partition partition, ChangeShard changeShard) throws PDException; void transferLeader(Metapb.Partition partition, TransferLeader transferLeader) throws diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionService.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionService.java index 9291a813c..c8ec3e3e7 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionService.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionService.java @@ -991,7 +991,6 @@ public class PartitionService implements RaftStateListener { } } - public void addInstructionListener(PartitionInstructionListener event) { instructionListeners.add(event); } diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionStatusListener.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionStatusListener.java index 933822f10..fea0ce35d 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionStatusListener.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionStatusListener.java @@ -23,6 +23,7 @@ import org.apache.hugegraph.pd.grpc.Metapb; * 分区状态监听 */ public interface PartitionStatusListener { + void onPartitionChanged(Metapb.Partition partition, Metapb.Partition newPartition); void onPartitionRemoved(Metapb.Partition partition); diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/RegistryService.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/RegistryService.java index 223889cad..86922d56d 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/RegistryService.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/RegistryService.java @@ -26,6 +26,7 @@ import org.apache.hugegraph.pd.meta.DiscoveryMetaStore; import org.apache.hugegraph.pd.meta.MetadataFactory; public class RegistryService { + private final PDConfig pdConfig; private final DiscoveryMetaStore meta; diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/ShardGroupStatusListener.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/ShardGroupStatusListener.java index 342a335ff..d5c068de9 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/ShardGroupStatusListener.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/ShardGroupStatusListener.java @@ -20,6 +20,7 @@ package org.apache.hugegraph.pd; import org.apache.hugegraph.pd.grpc.Metapb; public interface ShardGroupStatusListener { + void onShardListChanged(Metapb.ShardGroup shardGroup, Metapb.ShardGroup newShardGroup); void onShardListOp(Metapb.ShardGroup shardGroup); diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/StoreMonitorDataService.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/StoreMonitorDataService.java index 7be54db0c..54ff6b6e8 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/StoreMonitorDataService.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/StoreMonitorDataService.java @@ -36,10 +36,10 @@ import org.springframework.stereotype.Service; import lombok.extern.slf4j.Slf4j; - @Slf4j @Service public class StoreMonitorDataService { + private static final String MONITOR_DATA_PREFIX = "SMD"; private final PDConfig pdConfig; private final KvService kvService; @@ -49,7 +49,6 @@ public class StoreMonitorDataService { */ private final Map<Long, Long> lastStoreStateTimestamp; - public StoreMonitorDataService(PDConfig pdConfig) { this.pdConfig = pdConfig; this.kvService = new KvService(pdConfig); diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/StoreNodeService.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/StoreNodeService.java index bfd4f8803..b75532634 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/StoreNodeService.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/StoreNodeService.java @@ -47,7 +47,6 @@ import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; - /** * HgStore注册、保活管理类 */ diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/TaskScheduleService.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/TaskScheduleService.java index 9ec8152a0..889e5a023 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/TaskScheduleService.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/TaskScheduleService.java @@ -43,7 +43,6 @@ import org.apache.hugegraph.pd.raft.RaftEngine; import lombok.extern.slf4j.Slf4j; - /** * 任务调度服务,定时检查Store、资源、分区的状态,及时迁移数据,错误节点 * 1、监测Store是否离线 @@ -53,6 +52,7 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j public class TaskScheduleService { + private static final String BALANCE_SHARD_KEY = "BALANCE_SHARD_KEY"; private final long TurnOffAndBalanceInterval = 30 * 60 * 1000; //机器下线30后才能进行动态平衡 private final long BalanceLeaderInterval = 30 * 1000; // leader平衡时间间隔 @@ -82,7 +82,6 @@ public class TaskScheduleService { private long lastStoreTurnoffTime = 0; private long lastBalanceLeaderTime = 0; - public TaskScheduleService(PDConfig config, StoreNodeService storeService, PartitionService partitionService) { this.pdConfig = config; @@ -239,7 +238,6 @@ public class TaskScheduleService { return changedStores; } - /** * 巡查所有的分区,检查副本数是否正确 */ @@ -278,7 +276,6 @@ public class TaskScheduleService { return null; } - /** * 在Store之间平衡分区的数量 * 机器转为UP半小时后才能进行动态平衡 @@ -295,7 +292,6 @@ public class TaskScheduleService { return null;//机器下线半小时后才能进行动态平衡 } - int activeStores = storeService.getActiveStores().size(); if (activeStores == 0) { log.warn("balancePartitionShard non active stores, skip to balancePartitionShard"); @@ -565,7 +561,6 @@ public class TaskScheduleService { return results; } - private long getMaxIndexGap(Map<Integer, Map<Long, Long>> committedIndexMap, int partitionId) { long maxGap = Long.MAX_VALUE; if (committedIndexMap == null || !committedIndexMap.containsKey(partitionId)) { @@ -585,7 +580,6 @@ public class TaskScheduleService { return maxGap; } - /** * 执行分区分裂,分为自动分裂和手工分裂 * @@ -663,7 +657,6 @@ public class TaskScheduleService { return null; } - /** * Store汇报任务状态 * 分区状态发生改变,重新计算分区所在的ShardGroup、图和整个集群的状态 @@ -849,5 +842,4 @@ public class TaskScheduleService { return movedPartitions; } - } diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/config/PDConfig.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/config/PDConfig.java index abc54a94d..6ff66459e 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/config/PDConfig.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/config/PDConfig.java @@ -32,7 +32,6 @@ import org.springframework.stereotype.Component; import lombok.Data; - /** * PD配置文件 */ @@ -114,6 +113,7 @@ public class PDConfig { @Data @Configuration public class ThreadPoolGrpc { + @Value("${thread.pool.grpc.core:600}") private int core; @Value("${thread.pool.grpc.max:1000}") @@ -125,6 +125,7 @@ public class PDConfig { @Data @Configuration public class Raft { + @Value("${raft.enable:true }") private boolean enable; @Value("${raft.address}") @@ -155,6 +156,7 @@ public class PDConfig { @Data @Configuration public class Store { + // store 心跳超时时间 @Value("${store.keepAlive-timeout:300}") private long keepAliveTimeout = 300; @@ -201,7 +203,7 @@ public class PDConfig { private Long parseTimeExpression(String exp) { if (exp != null) { Pattern pattern = Pattern.compile( - "(?<n>(\\d+)*)(\\s)*(?<unit>(second|minute|hour|day|month|year)$)"); + "(?<n>(\\d+)*)(\\s)*(?<unit>(second|minute|hour|day|month|year)$)"); Matcher matcher = pattern.matcher(exp.trim()); if (matcher.find()) { String n = matcher.group("n"); @@ -244,6 +246,7 @@ public class PDConfig { @Data @Configuration public class Partition { + private int totalCount = 0; // 每个Store最大副本数 @@ -269,6 +272,7 @@ public class PDConfig { @Data @Configuration public class Discovery { + // 客户端注册后,无心跳最长次数,超过后,之前的注册信息会被删除 @Value("${discovery.heartbeat-try-count:3}") private int heartbeatOutTimes = 3; diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/ConfigMetaStore.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/ConfigMetaStore.java index 5ec6fe217..df332f46b 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/ConfigMetaStore.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/ConfigMetaStore.java @@ -26,7 +26,6 @@ import org.apache.hugegraph.pd.grpc.Metapb; public class ConfigMetaStore extends MetadataRocksDBStore { - private final long clusterId; public ConfigMetaStore(PDConfig pdConfig) { @@ -69,5 +68,4 @@ public class ConfigMetaStore extends MetadataRocksDBStore { return max.isPresent() ? max.get() : null; } - } diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/IdMetaStore.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/IdMetaStore.java index 70e4c501f..177e4255b 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/IdMetaStore.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/IdMetaStore.java @@ -40,7 +40,6 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class IdMetaStore extends MetadataRocksDBStore { - private static final String ID_PREFIX = "@ID@"; private static final String CID_PREFIX = "@CID@"; private static final String CID_SLOT_PREFIX = "@CID_SLOT@"; diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataFactory.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataFactory.java index c70eec489..cc247041c 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataFactory.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataFactory.java @@ -76,7 +76,6 @@ public class MetadataFactory { return new TaskInfoMeta(pdConfig); } - public static QueueStore newQueueStore(PDConfig pdConfig) { return new QueueStore(pdConfig); } diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataKeyHelper.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataKeyHelper.java index 8a421c2d6..2b29734c2 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataKeyHelper.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataKeyHelper.java @@ -345,6 +345,7 @@ public class MetadataKeyHelper { } static class StringBuilderHelper { + private static final int DISCARD_LIMIT = 1024 << 3; // 8k private static final ThreadLocal<StringBuilderHolder> holderThreadLocal = ThreadLocal diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataStoreBase.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataStoreBase.java index 10c38a3ec..4cd9e1d36 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataStoreBase.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataStoreBase.java @@ -17,7 +17,6 @@ package org.apache.hugegraph.pd.meta; - import java.io.IOException; import java.util.LinkedList; import java.util.List; diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/PartitionMeta.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/PartitionMeta.java index 09a4eb8e2..713a0046d 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/PartitionMeta.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/PartitionMeta.java @@ -32,6 +32,7 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j public class PartitionMeta extends MetadataRocksDBStore { + static String CID_GRAPH_ID_KEY = "GraphID"; static int CID_GRAPH_ID_MAX = 0xFFFE; private final PDConfig pdConfig; @@ -233,7 +234,6 @@ public class PartitionMeta extends MetadataRocksDBStore { return getOne(Metapb.PartitionStats.parser(), prefix); } - /** * 获取分区状态 */ diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/QueueStore.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/QueueStore.java index 74820ab02..e1b8437a4 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/QueueStore.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/QueueStore.java @@ -27,6 +27,7 @@ import org.apache.hugegraph.pd.raft.RaftEngine; import org.apache.hugegraph.pd.store.RaftKVStore; public class QueueStore extends MetadataRocksDBStore { + QueueStore(PDConfig pdConfig) { super(pdConfig); } diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/StoreInfoMeta.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/StoreInfoMeta.java index 45959211d..2a50b0448 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/StoreInfoMeta.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/StoreInfoMeta.java @@ -32,6 +32,7 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j public class StoreInfoMeta extends MetadataRocksDBStore { + private final PDConfig pdConfig; public StoreInfoMeta(PDConfig pdConfig) { diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/TaskInfoMeta.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/TaskInfoMeta.java index 148101de4..756be71e9 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/TaskInfoMeta.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/TaskInfoMeta.java @@ -30,6 +30,7 @@ import org.apache.hugegraph.pd.grpc.pulse.SplitPartition; * 任务管理 */ public class TaskInfoMeta extends MetadataRocksDBStore { + public TaskInfoMeta(PDConfig pdConfig) { super(pdConfig); } diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/FutureClosureAdapter.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/FutureClosureAdapter.java index 1991a78db..d90c50c6c 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/FutureClosureAdapter.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/FutureClosureAdapter.java @@ -23,6 +23,7 @@ import com.alipay.sofa.jraft.Closure; import com.alipay.sofa.jraft.Status; public class FutureClosureAdapter<T> implements Closure { + public final CompletableFuture<T> future = new CompletableFuture<>(); private T resp; diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java index f3089ed07..9ed62b0e6 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java @@ -55,6 +55,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class RaftEngine { + private static final RaftEngine INSTANCE = new RaftEngine(); private final RaftStateMachine stateMachine; private PDConfig.Raft config; diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftRpcClient.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftRpcClient.java index 2e17a65ee..6e47ce4e5 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftRpcClient.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftRpcClient.java @@ -33,6 +33,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class RaftRpcClient { + protected volatile RpcClient rpcClient; private RpcOptions rpcOptions; diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftRpcProcessor.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftRpcProcessor.java index 1286515de..ed950a4ee 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftRpcProcessor.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftRpcProcessor.java @@ -27,7 +27,6 @@ import lombok.Data; public class RaftRpcProcessor<T extends RaftRpcProcessor.BaseRequest> implements RpcProcessor<T> { - private final Class<?> requestClass; private final RaftEngine raftEngine; @@ -97,6 +96,7 @@ public class RaftRpcProcessor<T extends RaftRpcProcessor.BaseRequest> implements } public abstract static class BaseRequest implements Serializable { + public static final byte GET_GRPC_ADDRESS = 0x01; public abstract byte magic(); @@ -104,12 +104,14 @@ public class RaftRpcProcessor<T extends RaftRpcProcessor.BaseRequest> implements @Data public abstract static class BaseResponse implements Serializable { + private Status status; } @Data public static class GetMemberRequest extends BaseRequest { + @Override public byte magic() { return GET_GRPC_ADDRESS; @@ -118,6 +120,7 @@ public class RaftRpcProcessor<T extends RaftRpcProcessor.BaseRequest> implements @Data public static class GetMemberResponse extends BaseResponse { + private long clusterId; private String raftAddress; private String grpcAddress; diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftStateListener.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftStateListener.java index 020be6f8b..56f39e3ad 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftStateListener.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftStateListener.java @@ -18,5 +18,6 @@ package org.apache.hugegraph.pd.raft; public interface RaftStateListener { + void onRaftLeaderChanged(); } diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftStateMachine.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftStateMachine.java index 473321202..dafa209cb 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftStateMachine.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftStateMachine.java @@ -48,6 +48,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class RaftStateMachine extends StateMachineAdapter { + private static final String SNAPSHOT_DIR_NAME = "snapshot"; private static final String SNAPSHOT_ARCHIVE_NAME = "snapshot.zip"; private final AtomicLong leaderTerm = new AtomicLong(-1); @@ -147,7 +148,6 @@ public class RaftStateMachine extends StateMachineAdapter { super.onStopFollowing(ctx); } - @Override public void onConfigurationCommitted(final Configuration conf) { log.info("Raft onConfigurationCommitted {}", conf); @@ -235,7 +235,6 @@ public class RaftStateMachine extends StateMachineAdapter { return false; } - try { // TODO: remove file from meta // SnapshotReader 沒有提供刪除文件的接口 @@ -288,8 +287,8 @@ public class RaftStateMachine extends StateMachineAdapter { } } - public static class RaftClosureAdapter implements KVStoreClosure { + private final KVOperation op; private final KVStoreClosure closure; diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftTaskHandler.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftTaskHandler.java index 6dfced4c9..ec8120cc8 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftTaskHandler.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftTaskHandler.java @@ -23,5 +23,6 @@ import org.apache.hugegraph.pd.common.PDException; * 接收raft发送的数据 */ public interface RaftTaskHandler { + boolean invoke(final KVOperation op, KVStoreClosure response) throws PDException; } diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/BaseKVStoreClosure.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/BaseKVStoreClosure.java index 84974aea2..3cc4dbb54 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/BaseKVStoreClosure.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/BaseKVStoreClosure.java @@ -21,6 +21,7 @@ import org.apache.hugegraph.pd.grpc.Pdpb; import org.apache.hugegraph.pd.raft.KVStoreClosure; public abstract class BaseKVStoreClosure implements KVStoreClosure { + private Pdpb.Error error; private Object data; @@ -44,5 +45,4 @@ public abstract class BaseKVStoreClosure implements KVStoreClosure { this.data = data; } - } diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/HgKVStore.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/HgKVStore.java index bfa2f1ded..263cb70b2 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/HgKVStore.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/HgKVStore.java @@ -24,6 +24,7 @@ import org.apache.hugegraph.pd.common.PDException; import org.apache.hugegraph.pd.config.PDConfig; public interface HgKVStore { + void init(PDConfig config); void put(byte[] key, byte[] value) throws PDException; diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/HgKVStoreImpl.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/HgKVStoreImpl.java index 88ebd5ca2..bd2e7a9e2 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/HgKVStoreImpl.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/HgKVStoreImpl.java @@ -52,6 +52,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class HgKVStoreImpl implements HgKVStore { + private static final ConcurrentHashMap<String, ConcurrentMap<String, Object>> CACHE = new ConcurrentHashMap(); private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); @@ -324,7 +325,6 @@ public class HgKVStoreImpl implements HgKVStore { closeRocksDB(); } - private void closeRocksDB() { if (this.db != null) { this.db.close(); diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/KV.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/KV.java index 763a8541a..35dce065b 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/KV.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/KV.java @@ -17,8 +17,8 @@ package org.apache.hugegraph.pd.store; - public class KV { + private byte[] key; private byte[] value; diff --git a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/RaftKVStore.java b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/RaftKVStore.java index b9e373ce8..ed97d13f7 100644 --- a/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/RaftKVStore.java +++ b/hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/RaftKVStore.java @@ -37,7 +37,6 @@ import com.alipay.sofa.jraft.error.RaftError; import lombok.extern.slf4j.Slf4j; - @Slf4j public class RaftKVStore implements HgKVStore, RaftTaskHandler { @@ -103,7 +102,6 @@ public class RaftKVStore implements HgKVStore, RaftTaskHandler { return 0; } - @Override public long removeByPrefix(byte[] bytes) throws PDException { try {
