javeme commented on code in PR #2478:
URL: 
https://github.com/apache/incubator-hugegraph/pull/2478#discussion_r1536631952


##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataRocksDBStore.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.meta;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.pd.grpc.Pdpb;
+import org.apache.hugegraph.pd.store.HgKVStore;
+import org.apache.hugegraph.pd.store.KV;
+
+import com.google.protobuf.Parser;
+
+public class MetadataRocksDBStore extends MetadataStoreBase {
+
+    HgKVStore store;
+
+    PDConfig pdConfig;
+
+    public MetadataRocksDBStore(PDConfig pdConfig) {
+        store = MetadataFactory.getStore(pdConfig);
+        this.pdConfig = pdConfig;
+    }
+
+    public HgKVStore getStore() {
+        if (store == null) {
+            store = MetadataFactory.getStore(pdConfig);
+        }
+        return store;
+    }
+
+    @Override
+    public byte[] getOne(byte[] key) throws PDException {
+        try {
+            byte[] bytes = store.get(key);
+            return bytes;
+        } catch (Exception e) {
+            throw new PDException(Pdpb.ErrorType.ROCKSDB_READ_ERROR_VALUE, e);
+        }
+    }
+
+    @Override
+    public <E> E getOne(Parser<E> parser, byte[] key) throws PDException {
+        try {
+            byte[] bytes = store.get(key);
+            if (ArrayUtils.isEmpty(bytes)) {
+                return null;
+            }
+            return parser.parseFrom(bytes);
+        } catch (Exception e) {
+            throw new PDException(Pdpb.ErrorType.ROCKSDB_READ_ERROR_VALUE, e);
+        }
+    }
+
+    @Override
+    public void put(byte[] key, byte[] value) throws PDException {
+        try {
+            getStore().put(key, value);
+        } catch (Exception e) {
+            throw new PDException(Pdpb.ErrorType.ROCKSDB_WRITE_ERROR_VALUE, e);
+        }
+    }
+
+    @Override
+    public void putWithTTL(byte[] key, byte[] value, long ttl) throws 
PDException {
+        this.store.putWithTTL(key, value, ttl);
+    }
+
+    @Override
+    public void putWithTTL(byte[] key, byte[] value, long ttl, TimeUnit 
timeUnit) throws
+                                                                               
   PDException {
+        this.store.putWithTTL(key, value, ttl, timeUnit);
+    }
+
+    @Override
+    public byte[] getWithTTL(byte[] key) throws PDException {
+        return this.store.getWithTTL(key);
+    }
+
+    @Override
+    public List getListWithTTL(byte[] key) throws PDException {
+        return this.store.getListWithTTL(key);
+    }
+
+    @Override
+    public void removeWithTTL(byte[] key) throws PDException {
+        this.store.removeWithTTL(key);
+    }
+
+    @Override
+    public List<KV> scanPrefix(byte[] prefix) throws PDException {
+        //TODO 使用rocksdb 前缀查询
+        try {
+            return this.store.scanPrefix(prefix);
+        } catch (Exception e) {
+            throw new PDException(Pdpb.ErrorType.ROCKSDB_READ_ERROR_VALUE, e);
+        }
+    }
+
+    @Override
+    public List<KV> scanRange(byte[] start, byte[] end) throws PDException {
+        return this.store.scanRange(start, end);
+    }
+
+    @Override
+    public <E> List<E> scanRange(Parser<E> parser, byte[] start, byte[] end) 
throws PDException {
+        List<E> stores = new LinkedList<>();
+        try {
+            List<KV> kvs = this.scanRange(start, end);
+            for (KV keyValue : kvs) {
+                stores.add(parser.parseFrom(keyValue.getValue()));
+            }
+        } catch (Exception e) {
+            throw new PDException(Pdpb.ErrorType.ROCKSDB_READ_ERROR_VALUE, e);
+        }
+        return stores;
+    }
+
+    @Override
+    public <E> List<E> scanPrefix(Parser<E> parser, byte[] prefix) throws 
PDException {
+        List<E> stores = new LinkedList<>();
+        try {
+            List<KV> kvs = this.scanPrefix(prefix);
+            for (KV keyValue : kvs) {
+                stores.add(parser.parseFrom(keyValue.getValue()));
+            }
+        } catch (Exception e) {
+            throw new PDException(Pdpb.ErrorType.ROCKSDB_READ_ERROR_VALUE, e);
+        }
+        return stores;
+    }
+
+    @Override
+    public boolean containsKey(byte[] key) throws PDException {
+        return !ArrayUtils.isEmpty(store.get(key));
+    }
+
+    @Override
+    public long remove(byte[] key) throws PDException {
+        try {
+            return this.store.remove(key);
+        } catch (Exception e) {
+            throw new PDException(Pdpb.ErrorType.ROCKSDB_WRITE_ERROR_VALUE, e);
+        }
+    }
+
+    @Override
+    public long removeByPrefix(byte[] prefix) throws PDException {
+        try {
+            return this.store.removeByPrefix(prefix);
+        } catch (Exception e) {
+            throw new PDException(Pdpb.ErrorType.ROCKSDB_WRITE_ERROR_VALUE, e);
+        }
+    }
+
+    @Override
+    public void clearAllCache() throws PDException {
+        this.store.clear();
+    }
+
+    @Override
+    public void close() {
+

Review Comment:
   missing close



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/QueueStore.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.meta;
+
+import java.util.List;
+
+import org.apache.hugegraph.pd.common.HgAssert;
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.pd.grpc.Metapb;
+import org.apache.hugegraph.pd.raft.RaftEngine;
+import org.apache.hugegraph.pd.store.RaftKVStore;
+
+public class QueueStore extends MetadataRocksDBStore {
+
+    QueueStore(PDConfig pdConfig) {

Review Comment:
   can we keep public/protected mark explicitly



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/DiscoveryMetaStore.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.meta;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.pd.grpc.discovery.NodeInfo;
+import org.apache.hugegraph.pd.grpc.discovery.NodeInfos;
+import org.apache.hugegraph.pd.grpc.discovery.Query;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class DiscoveryMetaStore extends MetadataRocksDBStore {

Review Comment:
   keep a consistent naming style XxMeta or XxMetaStore



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/BaseKVStoreClosure.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.store;
+
+import org.apache.hugegraph.pd.grpc.Pdpb;
+import org.apache.hugegraph.pd.raft.KVStoreClosure;
+
+public abstract class BaseKVStoreClosure implements KVStoreClosure {

Review Comment:
   prefer AbstractKVStoreClosure



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/KvService.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd;
+
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.pd.grpc.kv.Kv;
+import org.apache.hugegraph.pd.grpc.kv.V;
+import org.apache.hugegraph.pd.meta.MetadataKeyHelper;
+import org.apache.hugegraph.pd.meta.MetadataRocksDBStore;
+import org.apache.hugegraph.pd.store.KV;
+import org.springframework.stereotype.Service;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ *
+ **/
+@Slf4j
+@Service
+public class KvService {

Review Comment:
   in what scenarios is this service used? do we need to provide a general kv 
service?



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/store/HgKVStoreImpl.java:
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.store;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.pd.grpc.Pdpb;
+import org.apache.hugegraph.pd.grpc.discovery.RegisterInfo;
+import org.rocksdb.Checkpoint;
+import org.rocksdb.Options;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
+
+import com.alipay.sofa.jraft.util.Utils;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.primitives.Bytes;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class HgKVStoreImpl implements HgKVStore {

Review Comment:
   prefer RocksDBHgKVStore



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/TaskScheduleService.java:
##########
@@ -0,0 +1,845 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.hugegraph.pd.common.KVPair;
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.pd.grpc.MetaTask;
+import org.apache.hugegraph.pd.grpc.Metapb;
+import org.apache.hugegraph.pd.grpc.Pdpb;
+import org.apache.hugegraph.pd.meta.TaskInfoMeta;
+import org.apache.hugegraph.pd.raft.RaftEngine;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 任务调度服务,定时检查Store、资源、分区的状态,及时迁移数据,错误节点
+ * 1、监测Store是否离线
+ * 2、监测Partition的副本是否正确
+ * 3、监测Partition的工作模式是否正确
+ * 4、监测Partition是否需要分裂,监测分裂是否完成
+ */
+@Slf4j
+public class TaskScheduleService {
+
+    private static final String BALANCE_SHARD_KEY = "BALANCE_SHARD_KEY";
+    private final long TurnOffAndBalanceInterval = 30 * 60 * 1000; 
//机器下线30后才能进行动态平衡
+    private final long BalanceLeaderInterval = 30 * 1000;   // leader平衡时间间隔
+    private final PDConfig pdConfig;
+    private final long clusterStartTime;    //
+    private final StoreNodeService storeService;
+    private final PartitionService partitionService;
+    private final ScheduledExecutorService executor;
+    private final TaskInfoMeta taskInfoMeta;
+    private final StoreMonitorDataService storeMonitorDataService;
+    private final KvService kvService;
+    private final LogService logService;
+    // 先按照value排序,再按照key排序
+    private final Comparator<KVPair<Long, Integer>> kvPairComparatorAsc = (o1, 
o2) -> {
+        if (o1.getValue() == o2.getValue()) {
+            return o1.getKey().compareTo(o2.getKey());
+        }
+        return o1.getValue().compareTo(o2.getValue());
+    };
+    // 先按照value排序(倒序),再按照key排序(升序)
+    private final Comparator<KVPair<Long, Integer>> kvPairComparatorDesc = 
(o1, o2) -> {
+        if (o1.getValue() == o2.getValue()) {
+            return o2.getKey().compareTo(o1.getKey());
+        }
+        return o2.getValue().compareTo(o1.getValue());
+    };
+    private long lastStoreTurnoffTime = 0;
+    private long lastBalanceLeaderTime = 0;
+
+    public TaskScheduleService(PDConfig config, StoreNodeService storeService,
+                               PartitionService partitionService) {
+        this.pdConfig = config;
+        this.storeService = storeService;
+        this.partitionService = partitionService;
+        this.taskInfoMeta = new TaskInfoMeta(config);
+        this.logService = new LogService(pdConfig);
+        this.storeMonitorDataService = new StoreMonitorDataService(pdConfig);
+        this.clusterStartTime = System.currentTimeMillis();
+        this.kvService = new KvService(pdConfig);
+        this.executor = new ScheduledThreadPoolExecutor(16);
+    }
+
+    public void init() {
+        executor.scheduleWithFixedDelay(() -> {
+            try {
+                patrolStores();
+            } catch (Throwable e) {
+                log.error("patrolStores exception: ", e);
+            }
+
+        }, 60, 60, TimeUnit.SECONDS);
+        executor.scheduleWithFixedDelay(() -> {
+            try {
+                patrolPartitions();
+                balancePartitionLeader(false);
+                balancePartitionShard();
+            } catch (Throwable e) {
+                log.error("patrolPartitions exception: ", e);
+            }
+        }, pdConfig.getPatrolInterval(), pdConfig.getPatrolInterval(), 
TimeUnit.SECONDS);
+        executor.scheduleWithFixedDelay(() -> {
+            if (isLeader()) {
+                kvService.clearTTLData();
+            }
+        }, 1000, 1000, TimeUnit.MILLISECONDS);
+        executor.scheduleWithFixedDelay(
+                () -> {
+                    if (isLeader()) {
+                        storeService.getQuotaChecker();
+                    }
+                }, 2, 30,
+                TimeUnit.SECONDS);
+        // clean expired monitor data each 10 minutes, delay 3min.
+        if (isLeader() && this.pdConfig.getStore().isMonitorDataEnabled()) {
+            executor.scheduleAtFixedRate(() -> {
+                Long expTill = System.currentTimeMillis() / 1000 -
+                               this.pdConfig.getStore().getRetentionPeriod();
+                log.debug("monitor data keys before " + expTill + " will be 
deleted");
+                int records = 0;
+                try {
+                    for (Metapb.Store store : storeService.getStores()) {
+                        int cnt =
+                                
this.storeMonitorDataService.removeExpiredMonitorData(store.getId(),
+                                                                               
       expTill);
+                        log.debug("store id :{}, records:{}", store.getId(), 
cnt);
+                        records += cnt;
+                    }
+                } catch (PDException e) {
+                    throw new RuntimeException(e);
+                }
+                log.debug(String.format("%d records has been deleted", 
records));
+            }, 180, 600, TimeUnit.SECONDS);
+        }
+
+        storeService.addStatusListener(new StoreStatusListener() {
+            @Override
+            public void onStoreStatusChanged(Metapb.Store store, 
Metapb.StoreState old,
+                                             Metapb.StoreState status) {
+                if (status == Metapb.StoreState.Tombstone) {
+                    lastStoreTurnoffTime = System.currentTimeMillis();
+                }
+
+                if (status == Metapb.StoreState.Up) {
+                    executor.schedule(() -> {
+                        try {  //store 上线后延时1分钟进行leader平衡
+                            balancePartitionLeader(false);
+                        } catch (PDException e) {
+                            log.error("exception {}", e);
+                        }
+                    }, BalanceLeaderInterval, TimeUnit.MILLISECONDS);
+
+                }
+            }
+
+            @Override
+            public void onGraphChange(Metapb.Graph graph,
+                                      Metapb.GraphState stateOld,
+                                      Metapb.GraphState stateNew) {
+
+            }
+
+            @Override
+            public void onStoreRaftChanged(Metapb.Store store) {
+
+            }
+        });
+    }
+
+    public void shutDown() {
+        executor.shutdownNow();
+    }
+
+    private boolean isLeader() {
+        return RaftEngine.getInstance().isLeader();
+    }
+
+    /**
+     * 巡查所有的store,检查是否在线,存储空间是否充足
+     */
+    public List<Metapb.Store> patrolStores() throws PDException {
+        if (!isLeader()) {
+            return null;
+        }
+
+        List<Metapb.Store> changedStores = new ArrayList<>();
+        // 检查store在线状态
+        List<Metapb.Store> stores = storeService.getStores("");
+        Map<Long, Metapb.Store> activeStores = storeService.getActiveStores("")
+                                                           .stream().collect(
+                        Collectors.toMap(Metapb.Store::getId, t -> t));
+        for (Metapb.Store store : stores) {
+            Metapb.Store changeStore = null;
+            if ((store.getState() == Metapb.StoreState.Up
+                 || store.getState() == Metapb.StoreState.Unknown)
+                && !activeStores.containsKey(store.getId())) {
+                // 不在线,修改状态为离线
+                changeStore = Metapb.Store.newBuilder(store)
+                                          .setState(Metapb.StoreState.Offline)
+                                          .build();
+
+            } else if ((store.getState() == Metapb.StoreState.Exiting &&
+                        !activeStores.containsKey(store.getId())) ||
+                       (store.getState() == Metapb.StoreState.Offline &&
+                        (System.currentTimeMillis() - store.getLastHeartbeat() 
>
+                         pdConfig.getStore().getMaxDownTime() * 1000) &&
+                        (System.currentTimeMillis() - clusterStartTime >
+                         pdConfig.getStore().getMaxDownTime() * 1000))) {
+                //手工修改为下线或者离线达到时长
+                // 修改状态为关机, 增加 checkStoreCanOffline 检测
+                if (storeService.checkStoreCanOffline(store)) {
+                    changeStore = Metapb.Store.newBuilder(store)
+                                              
.setState(Metapb.StoreState.Tombstone).build();
+                    this.logService.insertLog(LogService.NODE_CHANGE,
+                                              LogService.TASK, changeStore);
+                    log.info("patrolStores store {} Offline", 
changeStore.getId());
+                }
+            }
+            if (changeStore != null) {
+                storeService.updateStore(changeStore);
+                changedStores.add(changeStore);
+            }
+        }
+        return changedStores;
+    }
+
+    /**
+     * 巡查所有的分区,检查副本数是否正确
+     */
+    public List<Metapb.Partition> patrolPartitions() throws PDException {
+        if (!isLeader()) {
+            return null;
+        }
+
+        // 副本数不一致,重新分配副本
+        for (Metapb.ShardGroup group : storeService.getShardGroups()) {
+            if (group.getShardsCount() != 
pdConfig.getPartition().getShardCount()) {
+                storeService.reallocShards(group);
+                // 避免后面的 balance partition shard 马上执行.
+                kvService.put(BALANCE_SHARD_KEY, "DOING", 180 * 1000);
+            }
+        }
+        //检查shard是否在线。
+        Map<Long, Metapb.Store> tombStores = 
storeService.getTombStores().stream().collect(
+                Collectors.toMap(Metapb.Store::getId, t -> t));
+
+        var partIds = new HashSet<Integer>();
+
+        for (var pair : tombStores.entrySet()) {
+            for (var partition : 
partitionService.getPartitionByStore(pair.getValue())) {
+                if (partIds.contains(partition.getId())) {
+                    continue;
+                }
+                partIds.add(partition.getId());
+
+                storeService.storeTurnoff(pair.getValue());
+                partitionService.shardOffline(partition, 
pair.getValue().getId());
+            }
+
+        }
+
+        return null;
+    }
+
+    /**
+     * 在Store之间平衡分区的数量
+     * 机器转为UP半小时后才能进行动态平衡
+     */
+    public synchronized Map<Integer, KVPair<Long, Long>> 
balancePartitionShard() throws
+                                                                               
  PDException {
+        log.info("balancePartitions starting, isleader:{}", isLeader());
+
+        if (!isLeader()) {
+            return null;
+        }
+
+        if (System.currentTimeMillis() - lastStoreTurnoffTime < 
TurnOffAndBalanceInterval) {
+            return null;//机器下线半小时后才能进行动态平衡
+        }
+
+        int activeStores = storeService.getActiveStores().size();
+        if (activeStores == 0) {
+            log.warn("balancePartitionShard non active stores, skip to 
balancePartitionShard");
+            return null;
+        }
+
+        // 避免频繁调用. (当改变副本数,需要调整shard list,此时又需要平衡分区)会发送重复的指令。造成结果不可预料。
+        // 严重会删除掉分区.
+        if (Objects.equals(kvService.get(BALANCE_SHARD_KEY), "DOING")) {
+            return null;
+        }
+
+        int totalShards = pdConfig.getConfigService().getPartitionCount() *
+                          pdConfig.getPartition().getShardCount();
+        int averageCount = totalShards / activeStores;
+        int remainder = totalShards % activeStores;
+
+        // 统计每个store上分区, StoreId ->PartitionID, ShardRole
+        Map<Long, Map<Integer, Metapb.ShardRole>> partitionMap = new 
HashMap<>();
+        storeService.getActiveStores().forEach(store -> {
+            partitionMap.put(store.getId(), new HashMap<>());
+        });
+
+        // 如果是leaner 说明迁移正在进行,不要重复提交任务
+        AtomicReference<Boolean> isLeaner = new AtomicReference<>(false);
+        partitionService.getPartitions().forEach(partition -> {
+
+            try {
+                storeService.getShardList(partition.getId()).forEach(shard -> {
+                    Long storeId = shard.getStoreId();
+                    // 判断每个shard为leaner或者状态非正常状态
+                    if (shard.getRole() == Metapb.ShardRole.Learner
+                        || partition.getState() != 
Metapb.PartitionState.PState_Normal) {
+                        isLeaner.set(true);
+                    }
+                    if (partitionMap.containsKey(storeId)) {
+                        partitionMap.get(storeId).put(partition.getId(), 
shard.getRole());
+                    }
+                });
+            } catch (PDException e) {
+                log.error("get partition {} shard list error:{}.", 
partition.getId(),
+                          e.getMessage());
+            }
+        });
+
+        if (isLeaner.get()) {
+            log.warn("balancePartitionShard is doing, skip this 
balancePartitionShard task");
+            return null;
+        }
+
+        // 按照shard数量由高到低排序store
+        List<KVPair<Long, Integer>> sortedList = new ArrayList<>();
+        partitionMap.forEach((storeId, shards) -> {
+            sortedList.add(new KVPair(storeId, shards.size()));
+        });
+        // 由大到小排序的list
+        sortedList.sort(((o1, o2) -> o2.getValue().compareTo(o1.getValue())));
+        // 最大堆
+        PriorityQueue<KVPair<Long, Integer>> maxHeap = new 
PriorityQueue<>(sortedList.size(),
+                                                                           
(o1, o2) -> o2.getValue()
+                                                                               
          .compareTo(
+                                                                               
                  o1.getValue()));
+
+        // 各个副本的 committedIndex
+        Map<Integer, Map<Long, Long>> committedIndexMap = 
partitionService.getCommittedIndexStats();
+        // 分区ID --> 源StoreID,目标StoreID
+        Map<Integer, KVPair<Long, Long>> movedPartitions = new HashMap<>();
+        // 移除多余的shard, 
按照shards由多到少的顺序遍历store,余数remainder优先给shards多的store分配,减少迁移的概率
+        for (int index = 0; index < sortedList.size(); index++) {
+            long storeId = sortedList.get(index).getKey();
+            if (!partitionMap.containsKey(storeId)) {
+                log.error("cannot found storeId {} in partitionMap", storeId);
+                return null;
+            }
+            Map<Integer, Metapb.ShardRole> shards = partitionMap.get(storeId);
+            int targetCount = index < remainder ? averageCount + 1 : 
averageCount;
+            //  移除多余的shard, 添加源StoreID. 非Leader,并且该分区唯一
+            if (shards.size() > targetCount) {
+                int movedCount = shards.size() - targetCount;
+                log.info(
+                        "balancePartitionShard storeId {}, shardsSize {}, 
targetCount {}, " +
+                        "moveCount {}",
+                        storeId, shards.size(), targetCount, movedCount);
+                for (Iterator<Integer> iterator = shards.keySet().iterator();
+                     movedCount > 0 && iterator.hasNext(); ) {
+                    Integer id = iterator.next();
+
+                    if (!movedPartitions.containsKey(id)) {
+                        log.info("store {}, shard of partition {} can be 
moved", storeId, id);
+                        movedPartitions.put(id, new KVPair<>(storeId, 0L));
+                        movedCount--;
+                    }
+                }
+            } else if (shards.size() < targetCount) {
+                int addCount = targetCount - shards.size();
+                log.info(
+                        "balancePartitionShard storeId {}, shardsSize {}, 
targetCount {}, " +
+                        "addCount {}",
+                        storeId, shards.size(), targetCount, addCount);
+                maxHeap.add(new KVPair<>(storeId, addCount));
+            }
+        }
+
+        if (movedPartitions.size() == 0) {
+            log.warn(
+                    "movedPartitions is empty, totalShards:{} averageCount:{} 
remainder:{} " +
+                    "sortedList:{}",
+                    totalShards, averageCount, remainder, sortedList);
+        }
+        Iterator<Map.Entry<Integer, KVPair<Long, Long>>> moveIterator =
+                movedPartitions.entrySet().iterator();
+
+        while (moveIterator.hasNext()) {
+            if (maxHeap.size() == 0) {
+                break;
+            }
+            Map.Entry<Integer, KVPair<Long, Long>> moveEntry = 
moveIterator.next();
+            int partitionId = moveEntry.getKey();
+            long sourceStoreId = moveEntry.getValue().getKey();
+
+            List<KVPair<Long, Integer>> tmpList = new 
ArrayList<>(maxHeap.size());
+            while (maxHeap.size() > 0) {
+                KVPair<Long, Integer> pair = maxHeap.poll();
+                long destStoreId = pair.getKey();
+                boolean destContains = false;
+                if (partitionMap.containsKey(destStoreId)) {
+                    destContains = 
partitionMap.get(destStoreId).containsKey(partitionId);
+                }
+                // 如果目的store已经包含了该partition,则取一下store
+                if (!destContains) {
+                    moveEntry.getValue().setValue(pair.getKey());
+                    log.info(
+                            "balancePartitionShard will move partition {} from 
store {} to store " +
+                            "{}",
+                            moveEntry.getKey(),
+                            moveEntry.getValue().getKey(),
+                            moveEntry.getValue().getValue());
+                    if (pair.getValue() > 1) {
+                        pair.setValue(pair.getValue() - 1);
+                        tmpList.add(pair);
+                    }
+                    break;
+                }
+                tmpList.add(pair);
+            }
+            maxHeap.addAll(tmpList);
+        }
+
+        kvService.put(BALANCE_SHARD_KEY, "DOING", 180 * 1000);
+
+        // 开始迁移
+        movedPartitions.forEach((partId, storePair) -> {
+            // 源和目标storeID都不为0
+            if (storePair.getKey() > 0 && storePair.getValue() > 0) {
+                partitionService.movePartitionsShard(partId, 
storePair.getKey(),
+                                                     storePair.getValue());
+            } else {
+                log.warn("balancePartitionShard key or value is zero, 
partId:{} storePair:{}",
+                         partId, storePair);
+            }
+        });
+        return movedPartitions;
+    }
+
+    /**
+     * 在Store之间平衡分区的Leader的数量
+     */
+    public synchronized Map<Integer, Long> balancePartitionLeader(boolean 
immediately) throws
+                                                                               
        PDException {
+        Map<Integer, Long> results = new HashMap<>();
+
+        if (!isLeader()) {
+            return results;
+        }
+
+        if (!immediately &&
+            System.currentTimeMillis() - lastBalanceLeaderTime < 
BalanceLeaderInterval) {
+            return results;
+        }
+        lastBalanceLeaderTime = System.currentTimeMillis();
+
+        List<Metapb.ShardGroup> shardGroups = storeService.getShardGroups();
+
+        // 分裂或者缩容任务的时候,退出
+        var taskMeta = storeService.getTaskInfoMeta();
+        if (taskMeta.hasSplitTaskDoing() || taskMeta.hasMoveTaskDoing()) {
+            throw new PDException(1001, "split or combine task is processing, 
please try later!");
+        }
+
+        // 数据迁移的时候,退出
+        if (Objects.equals(kvService.get(BALANCE_SHARD_KEY), "DOING")) {
+            throw new PDException(1001, "balance shard is processing, please 
try later!");
+        }
+
+        if (shardGroups.size() == 0) {
+            return results;
+        }
+
+        Map<Long, Integer> storeShardCount = new HashMap<>();
+
+        shardGroups.forEach(group -> {
+            group.getShardsList().forEach(shard -> {
+                storeShardCount.put(shard.getStoreId(),
+                                    
storeShardCount.getOrDefault(shard.getStoreId(), 0) + 1);
+            });
+        });
+
+        log.info("balancePartitionLeader, shard group size: {}, by store: {}", 
shardGroups.size(),
+                 storeShardCount);
+
+        // 按照 target count, store id稳定排序
+        PriorityQueue<KVPair<Long, Integer>> targetCount =
+                new PriorityQueue<>(kvPairComparatorDesc);
+
+        var sortedGroups = storeShardCount.entrySet().stream()
+                                          .map(entry -> new 
KVPair<>(entry.getKey(),
+                                                                     
entry.getValue()))
+                                          .sorted(kvPairComparatorAsc)
+                                          .collect(Collectors.toList());
+        int sum = 0;
+
+        for (int i = 0; i < sortedGroups.size() - 1; i++) {
+            // at least one
+            int v = Math.max(
+                    sortedGroups.get(i).getValue() / 
pdConfig.getPartition().getShardCount(), 1);
+            targetCount.add(new KVPair<>(sortedGroups.get(i).getKey(), v));
+            sum += v;
+        }
+        // 最后一个, 除不尽的情况,保证总数正确
+        targetCount.add(new KVPair<>(sortedGroups.get(sortedGroups.size() - 
1).getKey(),
+                                     shardGroups.size() - sum));
+        log.info("target count: {}", targetCount);
+
+        for (var group : shardGroups) {
+            var map = group.getShardsList().stream()
+                           .collect(Collectors.toMap(Metapb.Shard::getStoreId, 
shard -> shard));
+            var tmpList = new ArrayList<KVPair<Long, Integer>>();
+            // store比较多的情况,可能不包含对应的store id. 则先将不符合的store保存到临时列表,直到找到一个合适的store
+            while (!targetCount.isEmpty()) {
+                var pair = targetCount.poll();
+                var storeId = pair.getKey();
+                if (map.containsKey(storeId)) {
+                    if (map.get(storeId).getRole() != Metapb.ShardRole.Leader) 
{
+                        log.info("shard group{}, store id:{}, set to leader", 
group.getId(),
+                                 storeId);
+                        partitionService.transferLeader(group.getId(), 
map.get(storeId));
+                        results.put(group.getId(), storeId);
+                    } else {
+                        log.info("shard group {}, store id :{}, is leader, no 
need change",
+                                 group.getId(), storeId);
+                    }
+
+                    if (pair.getValue() > 1) {
+                        // count -1
+                        pair.setValue(pair.getValue() - 1);
+                        tmpList.add(pair);
+                    }
+                    // 找到了,则处理完成
+                    break;
+                } else {
+                    tmpList.add(pair);
+                }
+            }
+            targetCount.addAll(tmpList);
+        }
+
+        return results;
+    }
+
+    private long getMaxIndexGap(Map<Integer, Map<Long, Long>> 
committedIndexMap, int partitionId) {
+        long maxGap = Long.MAX_VALUE;
+        if (committedIndexMap == null || 
!committedIndexMap.containsKey(partitionId)) {
+            return maxGap;
+        }
+        Map<Long, Long> shardMap = committedIndexMap.get(partitionId);
+        if (shardMap == null || shardMap.size() == 0) {
+            return maxGap;
+        }
+        List<Long> sortedList = new ArrayList<>();
+        shardMap.forEach((storeId, committedIndex) -> {
+            sortedList.add(committedIndex);
+        });
+        // 由大到小排序的list
+        sortedList.sort(Comparator.reverseOrder());
+        maxGap = sortedList.get(0) - sortedList.get(sortedList.size() - 1);
+        return maxGap;
+    }
+
+    /**
+     * 执行分区分裂,分为自动分裂和手工分裂
+     *
+     * @return
+     * @throws PDException
+     */
+    public List<Metapb.Partition> splitPartition(
+            Pdpb.OperationMode mode, List<Pdpb.SplitDataParam> params) throws 
PDException {
+
+        if (mode == Pdpb.OperationMode.Auto) {
+            return autoSplitPartition();
+        }
+
+        var list = params.stream()
+                         .map(param -> new KVPair<>(param.getPartitionId(), 
param.getCount()))
+                         .collect(Collectors.toList());
+
+        storeService.splitShardGroups(list);
+        return null;
+    }
+
+    /**
+     * 自动进行分区分裂,每个store达到最大分区数量
+     * 执行条件
+     * 分裂后每台机器分区数量少于partition.max-partitions-per-store
+     *
+     * @throws PDException
+     */
+    public List<Metapb.Partition> autoSplitPartition() throws PDException {
+        if (!isLeader()) {
+            return null;
+        }
+
+        if (Metapb.ClusterState.Cluster_OK != 
storeService.getClusterStats().getState()) {
+            if (Metapb.ClusterState.Cluster_Offline == 
storeService.getClusterStats().getState()) {
+                throw new 
PDException(Pdpb.ErrorType.Split_Partition_Doing_VALUE,
+                                      "The data is splitting");
+            } else {
+                throw new 
PDException(Pdpb.ErrorType.Cluster_State_Forbid_Splitting_VALUE,
+                                      "The current state of the cluster 
prohibits splitting data");
+            }
+        }
+
+        //For TEST
+        //   
pdConfig.getPartition().setMaxShardsPerStore(pdConfig.getPartition()
+        //   .getMaxShardsPerStore()*2);
+
+        // 计算集群能能支持的最大split count
+        int splitCount = pdConfig.getPartition().getMaxShardsPerStore() *
+                         storeService.getActiveStores().size() /
+                         (storeService.getShardGroups().size() *
+                          pdConfig.getPartition().getShardCount());
+
+        if (splitCount < 2) {
+            throw new 
PDException(Pdpb.ErrorType.Too_Many_Partitions_Per_Store_VALUE,
+                                  "Too many partitions per store, 
partition.store-max-shard-count" +
+                                  " = "
+                                  + 
pdConfig.getPartition().getMaxShardsPerStore());
+        }
+
+        // 每store未达最大分区数,进行分裂
+        log.info("Start to split partitions..., split count = {}", splitCount);
+
+        // 设置集群状态为下线
+        storeService.updateClusterStatus(Metapb.ClusterState.Cluster_Offline);
+        // 修改默认分区数量
+        // 
pdConfig.getConfigService().setPartitionCount(storeService.getShardGroups().size()
 *
+        // splitCount);
+
+        var list = storeService.getShardGroups().stream()
+                               .map(shardGroup -> new 
KVPair<>(shardGroup.getId(), splitCount))
+                               .collect(Collectors.toList());
+        storeService.splitShardGroups(list);
+
+        return null;
+    }
+
+    /**
+     * Store汇报任务状态
+     * 分区状态发生改变,重新计算分区所在的ShardGroup、图和整个集群的状态
+     *
+     * @param task
+     */
+    public void reportTask(MetaTask.Task task) {
+        try {
+            switch (task.getType()) {
+                case Split_Partition:
+                    partitionService.handleSplitTask(task);
+                    break;
+                case Move_Partition:
+                    partitionService.handleMoveTask(task);
+                    break;
+                case Clean_Partition:
+                    partitionService.handleCleanPartitionTask(task);
+                    break;
+                default:
+                    break;
+            }
+        } catch (Exception e) {
+            log.error("Report task exception {}, {}", e, task);
+        }
+    }
+
+    /**
+     * 对rocksdb进行compaction
+     *
+     * @throws PDException
+     */
+    public Boolean dbCompaction(String tableName) throws PDException {
+        if (!isLeader()) {
+            return false;
+        }
+
+        for (Metapb.ShardGroup shardGroup : storeService.getShardGroups()) {
+            storeService.shardGroupsDbCompaction(shardGroup.getId(), 
tableName);
+        }
+
+        //
+        return true;
+    }
+
+    /**
+     * 判断是否能把一个store的分区全部迁出,给出判断结果和迁移方案
+     */
+    public Map<String, Object> canAllPartitionsMovedOut(Metapb.Store 
sourceStore) throws
+                                                                               
   PDException {
+        if (!isLeader()) {
+            return null;
+        }
+        // 分析一个store上面的分区是否可以完全迁出
+        Map<String, Object> resultMap = new HashMap<>();
+        // 定义对象用于保存源store上面的分区 StoreId ->PartitionID, ShardRole
+        Map<Long, Map<Integer, Metapb.ShardRole>> sourcePartitionMap = new 
HashMap<>();
+        sourcePartitionMap.put(sourceStore.getId(), new HashMap<>());
+        // 定义对象用于保存其他活跃store上面的分区 StoreId ->PartitionID, ShardRole
+        Map<Long, Map<Integer, Metapb.ShardRole>> otherPartitionMap = new 
HashMap<>();
+        Map<Long, Long> availableDiskSpace = new HashMap<>(); // 每个store剩余的磁盘空间
+        Map<Integer, Long> partitionDataSize = new HashMap<>(); // 记录待迁移的分区的数据量
+
+        storeService.getActiveStores().forEach(store -> {
+            if (store.getId() != sourceStore.getId()) {
+                otherPartitionMap.put(store.getId(), new HashMap<>());
+                // 记录其他store的剩余的磁盘空间, 单位为Byte
+                availableDiskSpace.put(store.getId(), 
store.getStats().getAvailable());
+            } else {
+                resultMap.put("current_store_is_online", true);
+            }
+        });
+        // 统计待迁移的分区的数据大小 (从storeStats中统计,单位为KB)
+        for (Metapb.GraphStats graphStats : 
sourceStore.getStats().getGraphStatsList()) {
+            partitionDataSize.put(graphStats.getPartitionId(),
+                                  
partitionDataSize.getOrDefault(graphStats.getPartitionId(), 0L)
+                                  + graphStats.getApproximateSize());
+        }
+        // 给sourcePartitionMap 和 otherPartitionMap赋值
+        partitionService.getPartitions().forEach(partition -> {
+            try {
+                storeService.getShardList(partition.getId()).forEach(shard -> {
+                    long storeId = shard.getStoreId();
+                    if (storeId == sourceStore.getId()) {
+                        sourcePartitionMap.get(storeId).put(partition.getId(), 
shard.getRole());
+                    } else {
+                        if (otherPartitionMap.containsKey(storeId)) {
+                            
otherPartitionMap.get(storeId).put(partition.getId(), shard.getRole());
+                        }
+                    }
+
+                });
+            } catch (PDException e) {
+                throw new RuntimeException(e);
+            }
+        });
+        // 统计待移除的分区:即源store上面的所有分区
+        Map<Integer, KVPair<Long, Long>> movedPartitions = new HashMap<>();
+        for (Map.Entry<Integer, Metapb.ShardRole> entry : 
sourcePartitionMap.get(
+                sourceStore.getId()).entrySet()) {
+            movedPartitions.put(entry.getKey(), new 
KVPair<>(sourceStore.getId(), 0L));
+        }
+        // 统计其他store的分区数量, 用小顶堆保存,以便始终把分区数量较少的store优先考虑
+        PriorityQueue<KVPair<Long, Integer>> minHeap = new 
PriorityQueue<>(otherPartitionMap.size(),
+                                                                           
(o1, o2) -> o1.getValue()
+                                                                               
          .compareTo(
+                                                                               
                  o2.getValue()));
+        otherPartitionMap.forEach((storeId, shards) -> {
+            minHeap.add(new KVPair(storeId, shards.size()));
+        });
+        // 遍历待迁移的分区,优先迁移到分区比较少的store
+        Iterator<Map.Entry<Integer, KVPair<Long, Long>>> moveIterator =
+                movedPartitions.entrySet().iterator();
+        while (moveIterator.hasNext()) {
+            Map.Entry<Integer, KVPair<Long, Long>> moveEntry = 
moveIterator.next();
+            int partitionId = moveEntry.getKey();
+            List<KVPair<Long, Integer>> tmpList = new ArrayList<>(); // 
记录已经弹出优先队列的元素
+            while (minHeap.size() > 0) {
+                KVPair<Long, Integer> pair = minHeap.poll(); //弹出首个元素
+                long storeId = pair.getKey();
+                int partitionCount = pair.getValue();
+                Map<Integer, Metapb.ShardRole> shards = 
otherPartitionMap.get(storeId);
+                final int unitRate = 1024; // 平衡不同存储单位的进率
+                if ((!shards.containsKey(partitionId)) && (
+                        availableDiskSpace.getOrDefault(storeId, 0L) / 
unitRate >=
+                        partitionDataSize.getOrDefault(partitionId, 0L))) {
+                    // 如果目标store上面不包含该分区,且目标store剩余空间能容纳该分区,则进行迁移
+                    moveEntry.getValue().setValue(storeId); //设置移动的目标store
+                    log.info("plan to move partition {} to store {}, " +
+                             "available disk space {}, current 
partitionSize:{}",
+                             partitionId,
+                             storeId,
+                             availableDiskSpace.getOrDefault(storeId, 0L) / 
unitRate,
+                             partitionDataSize.getOrDefault(partitionId, 0L)
+                    );
+                    // 更新该store预期的剩余空间
+                    availableDiskSpace.put(storeId, 
availableDiskSpace.getOrDefault(storeId, 0L)
+                                                    - 
partitionDataSize.getOrDefault(partitionId,
+                                                                               
      0L) *
+                                                      unitRate);
+                    // 更新统计变量中该store的分区数量
+                    partitionCount += 1;
+                    pair.setValue(partitionCount);
+                    tmpList.add(pair);
+                    break;
+                } else {
+                    tmpList.add(pair);
+                }
+            }
+            minHeap.addAll(tmpList);
+        }
+        //检查是否未存在未分配目标store的分区
+        List<Integer> remainPartitions = new ArrayList<>();
+        movedPartitions.forEach((partId, storePair) -> {
+            if (storePair.getValue() == 0L) {
+                remainPartitions.add(partId);
+            }
+        });
+        if (remainPartitions.size() > 0) {
+            resultMap.put("flag", false);
+            resultMap.put("movedPartitions", null);
+        } else {
+            resultMap.put("flag", true);
+            resultMap.put("movedPartitions", movedPartitions);
+        }
+        return resultMap;
+
+    }
+
+    public Map<Integer, KVPair<Long, Long>> movePartitions(
+            Map<Integer, KVPair<Long, Long>> movedPartitions) {
+        if (!isLeader()) {
+            return null;
+        }
+        // 开始迁移
+        log.info("begin move partitions:");
+        movedPartitions.forEach((partId, storePair) -> {
+            // 源和目标storeID都不为0
+            if (storePair.getKey() > 0 && storePair.getValue() > 0) {
+                partitionService.movePartitionsShard(partId, 
storePair.getKey(),
+                                                     storePair.getValue());
+            }
+        });
+        return movedPartitions;
+    }
+

Review Comment:
   useless blank line



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/PartitionMeta.java:
##########
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.meta;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.common.PartitionCache;
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.pd.grpc.Metapb;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 分区信息管理
+ */
+@Slf4j
+public class PartitionMeta extends MetadataRocksDBStore {

Review Comment:
   can we let PartitionMeta hold a MetadataRocksDBStore instead?



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/ConfigMetaStore.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.meta;
+
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.pd.grpc.Metapb;
+
+public class ConfigMetaStore extends MetadataRocksDBStore {

Review Comment:
   keep a consistent naming style XxMeta or XxMetaStore



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/QueueStore.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.meta;
+
+import java.util.List;
+
+import org.apache.hugegraph.pd.common.HgAssert;
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.pd.grpc.Metapb;
+import org.apache.hugegraph.pd.raft.RaftEngine;
+import org.apache.hugegraph.pd.store.RaftKVStore;
+
+public class QueueStore extends MetadataRocksDBStore {

Review Comment:
   can we keep a consistent naming style: QueueMetaStore or QueueMeta



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/MetadataStoreBase.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.meta;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.grpc.Pdpb;
+import org.apache.hugegraph.pd.store.KV;
+
+import com.google.protobuf.Parser;
+
+public abstract class MetadataStoreBase {

Review Comment:
   prefer` interface MetadataStore`



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/ConfigService.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd;
+
+import java.util.List;
+
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.pd.grpc.Metapb;
+import org.apache.hugegraph.pd.meta.ConfigMetaStore;
+import org.apache.hugegraph.pd.meta.MetadataFactory;
+import org.apache.hugegraph.pd.raft.RaftEngine;
+import org.apache.hugegraph.pd.raft.RaftStateListener;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ConfigService implements RaftStateListener {
+
+    private final ConfigMetaStore meta;
+    private PDConfig pdConfig;
+
+    public ConfigService(PDConfig config) {
+        this.pdConfig = config;
+        config.setConfigService(this);
+        meta = MetadataFactory.newConfigMeta(config);
+    }
+
+    public Metapb.PDConfig getPDConfig(long version) throws PDException {
+        return this.meta.getPdConfig(version);
+    }
+
+    public Metapb.PDConfig getPDConfig() throws PDException {
+        return this.meta.getPdConfig(0);
+    }
+
+    public Metapb.PDConfig setPDConfig(Metapb.PDConfig mConfig) throws 
PDException {
+        Metapb.PDConfig oldCfg = getPDConfig();
+        Metapb.PDConfig.Builder builder = oldCfg.toBuilder().mergeFrom(mConfig)
+                                                
.setVersion(oldCfg.getVersion() + 1)
+                                                
.setTimestamp(System.currentTimeMillis());
+        mConfig = this.meta.setPdConfig(builder.build());
+        log.info("PDConfig has been modified, new PDConfig is {}", mConfig);
+        updatePDConfig(mConfig);
+        return mConfig;
+    }
+
+    public List<Metapb.GraphSpace> getGraphSpace(String graphSpaceName) throws 
PDException {
+        return this.meta.getGraphSpace(graphSpaceName);
+    }
+
+    public Metapb.GraphSpace setGraphSpace(Metapb.GraphSpace graphSpace) 
throws PDException {
+        return this.meta.setGraphSpace(graphSpace.toBuilder()
+                                                 
.setTimestamp(System.currentTimeMillis())
+                                                 .build());
+    }
+
+    /**
+     * 从存储中读取配置项,并覆盖全局的PDConfig对象

Review Comment:
   to be translated



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/IdMetaStore.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.meta;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.pd.store.KV;
+
+import com.caucho.hessian.io.Hessian2Input;
+import com.caucho.hessian.io.Hessian2Output;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 自增id的实现类
+ */
+@Slf4j
+public class IdMetaStore extends MetadataRocksDBStore {

Review Comment:
   keep a consistent naming style XxMeta or XxMetaStore 



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/meta/LogMeta.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.meta;
+
+import java.util.List;
+
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.pd.grpc.Metapb;
+
+public class LogMeta extends MetadataRocksDBStore {

Review Comment:
   keep a consistent naming style XxMeta or XxMetaStore



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/raft/RaftEngine.java:
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd.raft;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.pd.grpc.Metapb;
+import org.apache.hugegraph.pd.grpc.Pdpb;
+
+import com.alipay.sofa.jraft.JRaftUtils;
+import com.alipay.sofa.jraft.Node;
+import com.alipay.sofa.jraft.RaftGroupService;
+import com.alipay.sofa.jraft.ReplicatorGroup;
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.conf.Configuration;
+import com.alipay.sofa.jraft.core.Replicator;
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.entity.Task;
+import com.alipay.sofa.jraft.error.RaftError;
+import com.alipay.sofa.jraft.option.NodeOptions;
+import com.alipay.sofa.jraft.option.RaftOptions;
+import com.alipay.sofa.jraft.option.RpcOptions;
+import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
+import com.alipay.sofa.jraft.rpc.RpcServer;
+import com.alipay.sofa.jraft.util.Endpoint;
+import com.alipay.sofa.jraft.util.ThreadId;
+import com.alipay.sofa.jraft.util.internal.ThrowUtil;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class RaftEngine {
+
+    private static final RaftEngine INSTANCE = new RaftEngine();
+    private final RaftStateMachine stateMachine;
+    private PDConfig.Raft config;
+    private RaftGroupService raftGroupService;
+    private RpcServer rpcServer;
+    private Node raftNode;
+    private RaftRpcClient raftRpcClient;
+
+    public RaftEngine() {
+        this.stateMachine = new RaftStateMachine();
+    }
+
+    public static RaftEngine getInstance() {
+        return INSTANCE;
+    }
+
+    public boolean init(PDConfig.Raft config) {
+        if (this.raftNode != null) {
+            return false;
+        }
+        this.config = config;
+
+        raftRpcClient = new RaftRpcClient();
+        raftRpcClient.init(new RpcOptions());
+
+        String groupId = "pd_raft";
+        String raftPath = config.getDataPath() + "/" + groupId;
+        new File(raftPath).mkdirs();
+
+        new File(config.getDataPath()).mkdirs();
+        Configuration initConf = new Configuration();
+        initConf.parse(config.getPeersList());
+        if (config.isEnable() && config.getPeersList().length() < 3) {
+            log.error("The RaftEngine parameter is incorrect." +
+                      " When RAFT is enabled, the number of peers " + "cannot 
be less than 3");
+        }
+        // 设置 Node 参数,包括日志存储路径和状态机实例
+        NodeOptions nodeOptions = new NodeOptions();
+        nodeOptions.setFsm(stateMachine);
+        nodeOptions.setEnableMetrics(true);
+        // 日志路径
+        nodeOptions.setLogUri(raftPath + "/log");
+        // raft 元数据路径
+        nodeOptions.setRaftMetaUri(raftPath + "/meta");
+        // 快照路径
+        nodeOptions.setSnapshotUri(raftPath + "/snapshot");
+        // 初始集群
+        nodeOptions.setInitialConf(initConf);
+        // 快照时间间隔
+        nodeOptions.setSnapshotIntervalSecs(config.getSnapshotInterval());
+
+        nodeOptions.setRpcConnectTimeoutMs(config.getRpcTimeout());
+        nodeOptions.setRpcDefaultTimeout(config.getRpcTimeout());
+        nodeOptions.setRpcInstallSnapshotTimeout(config.getRpcTimeout());
+        // 设置 raft 配置
+        RaftOptions raftOptions = nodeOptions.getRaftOptions();
+
+        nodeOptions.setEnableMetrics(true);
+
+        final PeerId serverId = JRaftUtils.getPeerId(config.getAddress());
+
+        rpcServer = createRaftRpcServer(config.getAddress());
+        // 构建 raft 组并启动 raft
+        this.raftGroupService =
+                new RaftGroupService(groupId, serverId, nodeOptions, 
rpcServer, true);
+        this.raftNode = raftGroupService.start(false);
+        log.info("RaftEngine start successfully: id = {}, peers list = {}", 
groupId,
+                 nodeOptions.getInitialConf().getPeers());
+        return this.raftNode != null;
+    }
+
+    /**
+     * 创建 raft rpc server,用于 pd 之间通讯
+     */
+    private RpcServer createRaftRpcServer(String raftAddr) {
+        Endpoint endpoint = JRaftUtils.getEndPoint(raftAddr);
+        RpcServer rpcServer = 
RaftRpcServerFactory.createRaftRpcServer(endpoint);
+        RaftRpcProcessor.registerProcessor(rpcServer, this);
+        rpcServer.init(null);
+        return rpcServer;
+    }
+
+    public void shutDown() {
+        if (this.raftGroupService != null) {
+            this.raftGroupService.shutdown();
+            try {
+                this.raftGroupService.join();
+            } catch (final InterruptedException e) {
+                this.raftNode = null;
+                ThrowUtil.throwException(e);
+            }
+            this.raftGroupService = null;
+        }
+        if (this.rpcServer != null) {
+            this.rpcServer.shutdown();
+            this.rpcServer = null;
+        }
+        if (this.raftNode != null) {
+            this.raftNode.shutdown();
+        }
+        this.raftNode = null;
+    }
+
+    public boolean isLeader() {
+        return this.raftNode.isLeader(true);
+    }
+
+    /**
+     * 添加 Raft 任务,grpc 通过该接口给 raft 发送数据
+     */
+    public void addTask(Task task) {
+        if (!isLeader()) {
+            KVStoreClosure closure = (KVStoreClosure) task.getDone();
+            
closure.setError(Pdpb.Error.newBuilder().setType(Pdpb.ErrorType.NOT_LEADER).build());
+            closure.run(new Status(RaftError.EPERM, "Not leader"));
+            return;
+        }
+        this.raftNode.apply(task);
+    }
+
+    public void addStateListener(RaftStateListener listener) {
+        this.stateMachine.addStateListener(listener);
+    }
+
+    public void addTaskHandler(RaftTaskHandler handler) {
+        this.stateMachine.addTaskHandler(handler);
+    }
+
+    public PDConfig.Raft getConfig() {
+        return this.config;
+    }
+
+    public PeerId getLeader() {
+        return raftNode.getLeaderId();
+    }
+
+    /**
+     * 向 leader 发消息,获取 grpc 地址;
+     */
+    public String getLeaderGrpcAddress() throws ExecutionException, 
InterruptedException {

Review Comment:
   please rename grpc to rpc?



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/PartitionService.java:
##########
@@ -0,0 +1,1562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hugegraph.pd.common.KVPair;
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.common.PartitionUtils;
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.pd.grpc.MetaTask;
+import org.apache.hugegraph.pd.grpc.Metapb;
+import org.apache.hugegraph.pd.grpc.Pdpb;
+import org.apache.hugegraph.pd.grpc.pulse.ChangeShard;
+import org.apache.hugegraph.pd.grpc.pulse.CleanPartition;
+import org.apache.hugegraph.pd.grpc.pulse.CleanType;
+import org.apache.hugegraph.pd.grpc.pulse.ConfChangeType;
+import org.apache.hugegraph.pd.grpc.pulse.DbCompaction;
+import org.apache.hugegraph.pd.grpc.pulse.MovePartition;
+import org.apache.hugegraph.pd.grpc.pulse.PartitionKeyRange;
+import org.apache.hugegraph.pd.grpc.pulse.SplitPartition;
+import org.apache.hugegraph.pd.grpc.pulse.TransferLeader;
+import org.apache.hugegraph.pd.meta.MetadataFactory;
+import org.apache.hugegraph.pd.meta.PartitionMeta;
+import org.apache.hugegraph.pd.meta.TaskInfoMeta;
+import org.apache.hugegraph.pd.raft.RaftStateListener;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 分区管理
+ */
+@Slf4j
+public class PartitionService implements RaftStateListener {
+
+    private final long Partition_Version_Skip = 0x0F;
+    private final StoreNodeService storeService;

Review Comment:
   we can also keep storeNodeService name to avoid ambiguity



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/LogService.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd;
+
+import java.util.List;
+
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.pd.grpc.Metapb;
+import org.apache.hugegraph.pd.meta.LogMeta;
+import org.apache.hugegraph.pd.meta.MetadataFactory;
+import org.springframework.stereotype.Service;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.GeneratedMessageV3;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Service
+public class LogService {

Review Comment:
   can we clarify XxLogService



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/ConfigService.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd;
+
+import java.util.List;
+
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.pd.grpc.Metapb;
+import org.apache.hugegraph.pd.meta.ConfigMetaStore;
+import org.apache.hugegraph.pd.meta.MetadataFactory;
+import org.apache.hugegraph.pd.raft.RaftEngine;
+import org.apache.hugegraph.pd.raft.RaftStateListener;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class ConfigService implements RaftStateListener {
+
+    private final ConfigMetaStore meta;
+    private PDConfig pdConfig;
+
+    public ConfigService(PDConfig config) {
+        this.pdConfig = config;
+        config.setConfigService(this);
+        meta = MetadataFactory.newConfigMeta(config);
+    }
+
+    public Metapb.PDConfig getPDConfig(long version) throws PDException {
+        return this.meta.getPdConfig(version);
+    }
+
+    public Metapb.PDConfig getPDConfig() throws PDException {
+        return this.meta.getPdConfig(0);
+    }
+
+    public Metapb.PDConfig setPDConfig(Metapb.PDConfig mConfig) throws 
PDException {
+        Metapb.PDConfig oldCfg = getPDConfig();
+        Metapb.PDConfig.Builder builder = oldCfg.toBuilder().mergeFrom(mConfig)
+                                                
.setVersion(oldCfg.getVersion() + 1)
+                                                
.setTimestamp(System.currentTimeMillis());
+        mConfig = this.meta.setPdConfig(builder.build());
+        log.info("PDConfig has been modified, new PDConfig is {}", mConfig);
+        updatePDConfig(mConfig);
+        return mConfig;
+    }
+
+    public List<Metapb.GraphSpace> getGraphSpace(String graphSpaceName) throws 
PDException {
+        return this.meta.getGraphSpace(graphSpaceName);
+    }
+
+    public Metapb.GraphSpace setGraphSpace(Metapb.GraphSpace graphSpace) 
throws PDException {
+        return this.meta.setGraphSpace(graphSpace.toBuilder()
+                                                 
.setTimestamp(System.currentTimeMillis())
+                                                 .build());
+    }
+
+    /**
+     * 从存储中读取配置项,并覆盖全局的PDConfig对象
+     *
+     * @return
+     */
+    public PDConfig loadConfig() {
+        try {
+            Metapb.PDConfig mConfig = this.meta.getPdConfig(0);

Review Comment:
   prefer to also keep name pdConfig



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/KvService.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd;
+
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.pd.grpc.kv.Kv;
+import org.apache.hugegraph.pd.grpc.kv.V;
+import org.apache.hugegraph.pd.meta.MetadataKeyHelper;
+import org.apache.hugegraph.pd.meta.MetadataRocksDBStore;
+import org.apache.hugegraph.pd.store.KV;
+import org.springframework.stereotype.Service;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ *
+ **/
+@Slf4j
+@Service
+public class KvService {
+
+    public static final char KV_DELIMITER = '@';
+    // TODO 主前缀之后,增加类名做区分
+    private static final String TTL_PREFIX = "T";
+    private static final String KV_PREFIX = "K";
+    private static final String LOCK_PREFIX = "L";
+    private static final String KV_PREFIX_DELIMITER = KV_PREFIX + KV_DELIMITER;
+    private static final byte[] EMPTY_VALUE = new byte[0];
+    private final MetadataRocksDBStore meta;

Review Comment:
   prefer to rename meta to store, it's easy to understand



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/KvService.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd;
+
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.pd.grpc.kv.Kv;
+import org.apache.hugegraph.pd.grpc.kv.V;
+import org.apache.hugegraph.pd.meta.MetadataKeyHelper;
+import org.apache.hugegraph.pd.meta.MetadataRocksDBStore;
+import org.apache.hugegraph.pd.store.KV;
+import org.springframework.stereotype.Service;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ *
+ **/
+@Slf4j
+@Service
+public class KvService {
+
+    public static final char KV_DELIMITER = '@';
+    // TODO 主前缀之后,增加类名做区分
+    private static final String TTL_PREFIX = "T";
+    private static final String KV_PREFIX = "K";
+    private static final String LOCK_PREFIX = "L";
+    private static final String KV_PREFIX_DELIMITER = KV_PREFIX + KV_DELIMITER;
+    private static final byte[] EMPTY_VALUE = new byte[0];
+    private final MetadataRocksDBStore meta;
+    private PDConfig pdConfig;
+
+    public KvService(PDConfig config) {
+        this.pdConfig = config;
+        meta = new MetadataRocksDBStore(config);
+    }
+
+    public static String getKey(Object... keys) {

Review Comment:
   can we rename these static methods to `buildXX()`



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/KvService.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd;
+
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.pd.grpc.kv.Kv;
+import org.apache.hugegraph.pd.grpc.kv.V;
+import org.apache.hugegraph.pd.meta.MetadataKeyHelper;
+import org.apache.hugegraph.pd.meta.MetadataRocksDBStore;
+import org.apache.hugegraph.pd.store.KV;
+import org.springframework.stereotype.Service;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ *
+ **/
+@Slf4j
+@Service
+public class KvService {
+
+    public static final char KV_DELIMITER = '@';
+    // TODO 主前缀之后,增加类名做区分
+    private static final String TTL_PREFIX = "T";
+    private static final String KV_PREFIX = "K";
+    private static final String LOCK_PREFIX = "L";
+    private static final String KV_PREFIX_DELIMITER = KV_PREFIX + KV_DELIMITER;
+    private static final byte[] EMPTY_VALUE = new byte[0];
+    private final MetadataRocksDBStore meta;
+    private PDConfig pdConfig;
+
+    public KvService(PDConfig config) {
+        this.pdConfig = config;
+        meta = new MetadataRocksDBStore(config);
+    }
+
+    public static String getKey(Object... keys) {
+        StringBuilder builder = MetadataKeyHelper.getStringBuilderHelper();
+        builder.append(KV_PREFIX).append(KV_DELIMITER);
+        for (Object key : keys) {
+            builder.append(key == null ? "" : key).append(KV_DELIMITER);
+        }
+        return builder.substring(0, builder.length() - 1);
+    }
+
+    public static byte[] getKeyBytes(Object... keys) {
+        String key = getKey(keys);
+        return key.getBytes(Charset.defaultCharset());
+    }
+
+    public static String getKeyWithoutPrefix(Object... keys) {
+        StringBuilder builder = MetadataKeyHelper.getStringBuilderHelper();
+        for (Object key : keys) {
+            builder.append(key == null ? "" : key).append(KV_DELIMITER);
+        }
+        return builder.substring(0, builder.length() - 1);
+    }
+
+    public static String getDelimiter() {
+        return String.valueOf(KV_DELIMITER);
+    }
+
+    public PDConfig getPdConfig() {
+        return pdConfig;
+    }
+
+    public void setPdConfig(PDConfig pdConfig) {
+        this.pdConfig = pdConfig;
+    }
+
+    public void put(String key, String value) throws PDException {
+        V storeValue = V.newBuilder().setValue(value).setTtl(0).build();
+        meta.put(getStoreKey(key), storeValue.toByteArray());
+        // log.warn("add key with key-{}:value-{}", key, value);
+    }
+
+    public void put(String key, String value, long ttl) throws PDException {
+        long curTime = System.currentTimeMillis();
+        curTime += ttl;
+        V storeValue = 
V.newBuilder().setValue(value).setSt(ttl).setTtl(curTime).build();
+        meta.put(getStoreKey(key), storeValue.toByteArray());
+        meta.put(getTTLStoreKey(key, curTime), EMPTY_VALUE);
+        // log.warn("add key with key-{}:value-{}:ttl-{}", key, value, ttl);
+    }
+
+    public String get(String key) throws PDException {
+        byte[] storeKey = getStoreKey(key);
+        return get(storeKey);
+    }
+
+    public String get(byte[] keyBytes) throws PDException {
+        byte[] bytes = meta.getOne(keyBytes);
+        String v = getValue(keyBytes, bytes);
+        return v;
+    }
+
+    private String getValue(byte[] keyBytes, byte[] valueBytes) throws 
PDException {
+        if (valueBytes == null || valueBytes.length == 0) {
+            return "";
+        }
+        try {
+            V v = V.parseFrom(valueBytes);
+            if (v.getTtl() == 0 || v.getTtl() >= System.currentTimeMillis()) {
+                return v.getValue();
+            } else {
+                meta.remove(keyBytes);
+                meta.remove(getTTLStoreKey(new String(keyBytes), v.getTtl()));
+            }
+        } catch (Exception e) {
+            log.error("parse value with error:{}", e.getMessage());
+            throw new PDException(-1, e.getMessage());

Review Comment:
   what does the -1 mean? define a Status enum?



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/TaskScheduleService.java:
##########
@@ -0,0 +1,845 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import org.apache.hugegraph.pd.common.KVPair;
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.pd.grpc.MetaTask;
+import org.apache.hugegraph.pd.grpc.Metapb;
+import org.apache.hugegraph.pd.grpc.Pdpb;
+import org.apache.hugegraph.pd.meta.TaskInfoMeta;
+import org.apache.hugegraph.pd.raft.RaftEngine;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 任务调度服务,定时检查Store、资源、分区的状态,及时迁移数据,错误节点
+ * 1、监测Store是否离线
+ * 2、监测Partition的副本是否正确
+ * 3、监测Partition的工作模式是否正确
+ * 4、监测Partition是否需要分裂,监测分裂是否完成
+ */
+@Slf4j
+public class TaskScheduleService {

Review Comment:
   seems it's not a service, prefer to  keep TaskScheduler name, and also move 
to a package 
    like pd.task



##########
hugegraph-pd/pom.xml:
##########
@@ -36,11 +36,10 @@
         <module>hg-pd-common</module>
         <module>hg-pd-client</module>
         <module>hg-pd-test</module>
+        <module>hg-pd-core</module>

Review Comment:
   we have introduced sofa-rpc, not sure why need hg-pd-grpc 



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/ShardGroupStatusListener.java:
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd;
+
+import org.apache.hugegraph.pd.grpc.Metapb;
+
+public interface ShardGroupStatusListener {

Review Comment:
   can we move all the Listener files to package pd.listener, and move all the 
Service files to package pd.service



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/StoreStatusListener.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd;
+
+import org.apache.hugegraph.pd.grpc.Metapb;
+
+public interface StoreStatusListener {

Review Comment:
   does the Store mean StoreNode? if yes we can keep StoreNodeStatusListener.



##########
hugegraph-pd/hg-pd-core/src/main/java/org/apache/hugegraph/pd/KvService.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hugegraph.pd;
+
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hugegraph.pd.common.PDException;
+import org.apache.hugegraph.pd.config.PDConfig;
+import org.apache.hugegraph.pd.grpc.kv.Kv;
+import org.apache.hugegraph.pd.grpc.kv.V;
+import org.apache.hugegraph.pd.meta.MetadataKeyHelper;
+import org.apache.hugegraph.pd.meta.MetadataRocksDBStore;
+import org.apache.hugegraph.pd.store.KV;
+import org.springframework.stereotype.Service;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ *
+ **/
+@Slf4j
+@Service
+public class KvService {
+
+    public static final char KV_DELIMITER = '@';
+    // TODO 主前缀之后,增加类名做区分
+    private static final String TTL_PREFIX = "T";
+    private static final String KV_PREFIX = "K";
+    private static final String LOCK_PREFIX = "L";
+    private static final String KV_PREFIX_DELIMITER = KV_PREFIX + KV_DELIMITER;
+    private static final byte[] EMPTY_VALUE = new byte[0];
+    private final MetadataRocksDBStore meta;
+    private PDConfig pdConfig;
+
+    public KvService(PDConfig config) {
+        this.pdConfig = config;
+        meta = new MetadataRocksDBStore(config);
+    }
+
+    public static String getKey(Object... keys) {
+        StringBuilder builder = MetadataKeyHelper.getStringBuilderHelper();
+        builder.append(KV_PREFIX).append(KV_DELIMITER);
+        for (Object key : keys) {
+            builder.append(key == null ? "" : key).append(KV_DELIMITER);
+        }
+        return builder.substring(0, builder.length() - 1);
+    }
+
+    public static byte[] getKeyBytes(Object... keys) {
+        String key = getKey(keys);
+        return key.getBytes(Charset.defaultCharset());
+    }
+
+    public static String getKeyWithoutPrefix(Object... keys) {
+        StringBuilder builder = MetadataKeyHelper.getStringBuilderHelper();
+        for (Object key : keys) {
+            builder.append(key == null ? "" : key).append(KV_DELIMITER);
+        }
+        return builder.substring(0, builder.length() - 1);
+    }
+
+    public static String getDelimiter() {
+        return String.valueOf(KV_DELIMITER);
+    }
+
+    public PDConfig getPdConfig() {
+        return pdConfig;
+    }
+
+    public void setPdConfig(PDConfig pdConfig) {
+        this.pdConfig = pdConfig;
+    }
+
+    public void put(String key, String value) throws PDException {
+        V storeValue = V.newBuilder().setValue(value).setTtl(0).build();
+        meta.put(getStoreKey(key), storeValue.toByteArray());
+        // log.warn("add key with key-{}:value-{}", key, value);
+    }
+
+    public void put(String key, String value, long ttl) throws PDException {
+        long curTime = System.currentTimeMillis();
+        curTime += ttl;
+        V storeValue = 
V.newBuilder().setValue(value).setSt(ttl).setTtl(curTime).build();
+        meta.put(getStoreKey(key), storeValue.toByteArray());
+        meta.put(getTTLStoreKey(key, curTime), EMPTY_VALUE);
+        // log.warn("add key with key-{}:value-{}:ttl-{}", key, value, ttl);
+    }
+
+    public String get(String key) throws PDException {
+        byte[] storeKey = getStoreKey(key);
+        return get(storeKey);
+    }
+
+    public String get(byte[] keyBytes) throws PDException {
+        byte[] bytes = meta.getOne(keyBytes);
+        String v = getValue(keyBytes, bytes);
+        return v;
+    }
+
+    private String getValue(byte[] keyBytes, byte[] valueBytes) throws 
PDException {
+        if (valueBytes == null || valueBytes.length == 0) {
+            return "";
+        }
+        try {
+            V v = V.parseFrom(valueBytes);
+            if (v.getTtl() == 0 || v.getTtl() >= System.currentTimeMillis()) {
+                return v.getValue();
+            } else {
+                meta.remove(keyBytes);
+                meta.remove(getTTLStoreKey(new String(keyBytes), v.getTtl()));
+            }
+        } catch (Exception e) {
+            log.error("parse value with error:{}", e.getMessage());

Review Comment:
   prefer LOG style



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to