This is an automated email from the ASF dual-hosted git repository. vgalaxies pushed a commit to branch trans-pd in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit 5c5ce2f7218d2813a621ea90b21e84df02dc589e Author: VGalaxies <[email protected]> AuthorDate: Sun May 12 11:41:58 2024 +0800 translate pd client --- .../apache/hugegraph/pd/client/AbstractClient.java | 1 - .../pd/client/AbstractClientStubProxy.java | 2 +- .../apache/hugegraph/pd/client/ClientCache.java | 15 --- .../hugegraph/pd/client/DiscoveryClient.java | 16 +-- .../hugegraph/pd/client/DiscoveryClientImpl.java | 4 +- .../org/apache/hugegraph/pd/client/PDClient.java | 109 ++++++++++----------- .../org/apache/hugegraph/pd/client/PDConfig.java | 10 +- .../org/apache/hugegraph/pd/client/PDPulse.java | 3 +- 8 files changed, 66 insertions(+), 94 deletions(-) diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/AbstractClient.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/AbstractClient.java index 874ef6f67..b83d7ba00 100644 --- a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/AbstractClient.java +++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/AbstractClient.java @@ -164,7 +164,6 @@ public abstract class AbstractClient implements Closeable { log.error(method.getFullMethodName() + " exception, {}", e.getMessage()); if (e instanceof StatusRuntimeException) { if (retry < stubProxy.getHostCount()) { - // 网络不通,关掉之前连接,换host重新连接 synchronized (this) { stubProxy.setBlockingStub(null); } diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/AbstractClientStubProxy.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/AbstractClientStubProxy.java index 6ee3fcb62..a0bb181b7 100644 --- a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/AbstractClientStubProxy.java +++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/AbstractClientStubProxy.java @@ -42,7 +42,7 @@ public class AbstractClientStubProxy { public String nextHost() { String host = hostList.poll(); - hostList.offer(host); //移到尾部 + hostList.offer(host); return host; } diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/ClientCache.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/ClientCache.java index 868f8fae3..0ebc28521 100644 --- a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/ClientCache.java +++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/ClientCache.java @@ -105,13 +105,6 @@ public class ClientCache { return null; } - /** - * 根据key的hashcode返回分区信息 - * - * @param graphName - * @param code - * @return - */ public KVPair<Partition, Shard> getPartitionByCode(String graphName, long code) { try { GraphCache graph = initGraph(graphName); @@ -172,12 +165,6 @@ public class ClientCache { } } - /** - * 返回key所在的分区信息 - * - * @param key - * @return - */ public KVPair<Partition, Shard> getPartitionByKey(String graphName, byte[] key) { int code = PartitionUtils.calcHashcode(key); return getPartitionByCode(graphName, code); @@ -193,8 +180,6 @@ public class ClientCache { RangeMap<Long, Integer> range = graph.getRange(); graph.addPartition(partId, partition); if (p != null) { - // old [1-3) 被 [2-3)覆盖了。当 [1-3) 变成[1-2) 不应该删除原先的[1-3) - // 当确认老的 start, end 都是自己的时候,才可以删除老的. (即还没覆盖) if (Objects.equals(partition.getId(), range.get(partition.getStartKey())) && Objects.equals(partition.getId(), range.get(partition.getEndKey() - 1))) { range.remove(range.getEntry(partition.getStartKey()).getKey()); diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/DiscoveryClient.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/DiscoveryClient.java index c307b9621..d280b1344 100644 --- a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/DiscoveryClient.java +++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/DiscoveryClient.java @@ -45,10 +45,10 @@ public abstract class DiscoveryClient implements Closeable, Discoverable { private final Timer timer = new Timer("serverHeartbeat", true); private final AtomicBoolean requireResetStub = new AtomicBoolean(false); - protected int period; //心跳周期 + protected int period; LinkedList<String> pdAddresses = new LinkedList<>(); ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - private volatile int currentIndex; // 当前在用pd地址位置 + private volatile int currentIndex; private int maxTime = 6; private ManagedChannel channel = null; private DiscoveryServiceGrpc.DiscoveryServiceBlockingStub registerStub; @@ -88,9 +88,6 @@ public abstract class DiscoveryClient implements Closeable, Discoverable { return null; } - /*** - * 按照pd列表重置stub - */ private void resetStub() { String errLog = null; for (int i = currentIndex + 1; i <= pdAddresses.size() + currentIndex; i++) { @@ -115,11 +112,6 @@ public abstract class DiscoveryClient implements Closeable, Discoverable { } } - /*** - * 按照某个pd的地址重置channel和stub - * @param singleAddress - * @throws PDException - */ private void resetChannel(String singleAddress) throws PDException { readWriteLock.writeLock().lock(); @@ -146,7 +138,7 @@ public abstract class DiscoveryClient implements Closeable, Discoverable { } /*** - * 获取注册节点信息 + * Obtain the registration node information * @param query * @return */ @@ -167,7 +159,7 @@ public abstract class DiscoveryClient implements Closeable, Discoverable { } /*** - * 启动心跳任务 + * Start the heartbeat task */ @Override public void scheduleTask() { diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/DiscoveryClientImpl.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/DiscoveryClientImpl.java index 049ca17a1..4f76d5ac9 100644 --- a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/DiscoveryClientImpl.java +++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/DiscoveryClientImpl.java @@ -28,10 +28,10 @@ import org.apache.hugegraph.pd.grpc.discovery.RegisterType; public class DiscoveryClientImpl extends DiscoveryClient { private final String id; - private final RegisterType type; // 心跳类型,备用 + private final RegisterType type; private final String version; private final String appName; - private final int times; // 心跳过期次数,备用 + private final int times; private final String address; private final Map labels; private final Consumer registerConsumer; diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDClient.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDClient.java index 6c3eae425..b38610c49 100644 --- a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDClient.java +++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDClient.java @@ -53,7 +53,7 @@ import io.grpc.stub.AbstractBlockingStub; import lombok.extern.slf4j.Slf4j; /** - * PD客户端实现类 + * PD client implementation class */ @Slf4j public class PDClient { @@ -78,7 +78,7 @@ public class PDClient { } /** - * 创建PDClient对象,并初始化stub + * Create a PD client object and initialize the stub * * @param config * @return @@ -210,7 +210,6 @@ public class PDClient { } private synchronized void closeStub(boolean closeWatcher) { - // TODO ManagedChannel 没有正常关闭 stubProxy.set(null); cache.reset(); @@ -308,7 +307,8 @@ public class PDClient { } /** - * Store注册,返回storeID,初次注册会返回新ID + * Store registration, the store ID will be returned, and the initial registration will + * return a new ID * * @param store * @return @@ -325,7 +325,7 @@ public class PDClient { } /** - * 根据storeId返回Store对象 + * Returns the Store object based on the store ID * * @param storeId * @return @@ -348,7 +348,7 @@ public class PDClient { } /** - * 更新Store信息,包括上下线等 + * Update the store information, including online and offline * * @param store * @return @@ -368,7 +368,7 @@ public class PDClient { } /** - * 返回活跃的Store + * Return to the active store * * @param graphName * @return @@ -400,7 +400,7 @@ public class PDClient { } /** - * 返回活跃的Store + * Return to the active store * * @param graphName * @return @@ -418,7 +418,7 @@ public class PDClient { } /** - * Store心跳,定期调用,保持在线状态 + * Store heartbeat, call regularly, stay online * * @param stats * @throws PDException @@ -452,7 +452,7 @@ public class PDClient { } /** - * 查询Key所属分区信息 + * Query the partition to which the key belongs * * @param graphName * @param key @@ -461,7 +461,6 @@ public class PDClient { */ public KVPair<Metapb.Partition, Metapb.Shard> getPartition(String graphName, byte[] key) throws PDException { - // 先查cache,cache没有命中,在调用PD KVPair<Metapb.Partition, Metapb.Shard> partShard = cache.getPartitionByKey(graphName, key); partShard = getKvPair(graphName, key, partShard); return partShard; @@ -477,7 +476,7 @@ public class PDClient { } /** - * 根据hashcode查询所属分区信息 + * Query the partition information based on the hashcode * * @param graphName * @param hashCode @@ -487,7 +486,6 @@ public class PDClient { public KVPair<Metapb.Partition, Metapb.Shard> getPartitionByCode(String graphName, long hashCode) throws PDException { - // 先查cache,cache没有命中,在调用PD KVPair<Metapb.Partition, Metapb.Shard> partShard = cache.getPartitionByCode(graphName, hashCode); if (partShard == null) { @@ -520,14 +518,14 @@ public class PDClient { } /** - * 获取Key的哈希值 + * Obtain the hash value of the key */ public int keyToCode(String graphName, byte[] key) { return PartitionUtils.calcHashcode(key); } /** - * 根据分区id返回分区信息, RPC请求 + * Returns partition information based on the partition ID and RPC request * * @param graphName * @param partId @@ -605,7 +603,7 @@ public class PDClient { } /** - * 返回startKey和endKey跨越的所有分区信息 + * Returns information about all partitions spanned by the start and end keys * * @param graphName * @param startKey @@ -626,8 +624,7 @@ public class PDClient { partitions.add(startPartShard); while (startPartShard.getKey().getEndKey() < endPartShard.getKey().getEndKey() - && startPartShard.getKey().getEndKey() < - PartitionUtils.MAX_VALUE /*排除最后一个分区*/) { + && startPartShard.getKey().getEndKey() < PartitionUtils.MAX_VALUE) { startPartShard = getPartitionByCode(graphName, startPartShard.getKey().getEndKey()); partitions.add(startPartShard); } @@ -635,7 +632,7 @@ public class PDClient { } /** - * 根据条件查询分区信息 + * Query partition information based on conditions * * @return * @throws PDException @@ -654,12 +651,6 @@ public class PDClient { return response.getPartitionsList(); } - /** - * 查找指定store上的指定partitionId - * - * @return - * @throws PDException - */ public List<Metapb.Partition> queryPartitions(long storeId, int partitionId) throws PDException { @@ -765,32 +756,30 @@ public class PDClient { } /** - * 删除分区缓存 + * Delete the partitioned cache */ public void invalidPartitionCache(String graphName, int partitionId) { - // 检查是否存在缓存 if (null != cache.getPartitionById(graphName, partitionId)) { cache.removePartition(graphName, partitionId); } } /** - * 删除分区缓存 + * Delete the partitioned cache */ public void invalidPartitionCache() { - // 检查是否存在缓存 cache.removePartitions(); } /** - * 删除分区缓存 + * Delete the partitioned cache */ public void invalidStoreCache(long storeId) { cache.removeStore(storeId); } /** - * Hugegraph server 调用,Leader发生改变,更新缓存 + * Update the cache */ public void updatePartitionLeader(String graphName, int partId, long leaderStoreId) { KVPair<Metapb.Partition, Metapb.Shard> partShard = null; @@ -817,7 +806,6 @@ public class PDClient { if (config.isEnableCache()) { if (shard == null) { - // 分区的shard中未找到leader,说明分区发生了迁移 cache.removePartition(graphName, partId); } } @@ -828,7 +816,7 @@ public class PDClient { } /** - * Hugegraph-store调用,更新缓存 + * Update the cache * * @param partition */ @@ -904,10 +892,7 @@ public class PDClient { } catch (Exception e) { log.error(method.getFullMethodName() + " exception, {}", e.getMessage()); if (e instanceof StatusRuntimeException) { - StatusRuntimeException se = (StatusRuntimeException) e; - //se.getStatus() == Status.UNAVAILABLE && if (retry < stubProxy.getHostCount()) { - // 网络不通,关掉之前连接,换host重新连接 closeStub(true); return blockingUnaryCall(method, req, ++retry); } @@ -938,7 +923,7 @@ public class PDClient { } /** - * 返回Store状态信息 + * Returns the store status information */ public List<Metapb.Store> getStoreStatus(boolean offlineExcluded) throws PDException { Pdpb.GetAllStoresRequest request = Pdpb.GetAllStoresRequest.newBuilder() @@ -1031,8 +1016,12 @@ public class PDClient { } /** - * 工作模式 - * Auto:自动分裂,每个Store上分区数达到最大值 + * Working mode + * Auto:If the number of partitions on each store reaches the maximum value, you need to + * specify the store group id. The store group id is 0, which is the default partition + * splitData(ClusterOp.OperationMode mode, int storeGroupId, List<ClusterOp.SplitDataParam> + * params) + * mode = Auto storeGroupId, params * * @throws PDException */ @@ -1046,9 +1035,11 @@ public class PDClient { } /** - * 工作模式 - * Auto:自动分裂,每个Store上分区数达到最大值 - * Expert:专家模式,需要指定splitParams + * Working mode + * Auto:If the number of partitions on each store reaches the maximum value, you need to + * specify the store group id. The store group id is 0, which is the default partition + * Expert:Expert Mode,Specifier is required splitParams, limit SplitDataParam in the same + * store group * * @param mode * @param params @@ -1075,7 +1066,8 @@ public class PDClient { } /** - * 自动转移,达到每个Store上分区数量相同 + * To automatically transfer to the same number of partitions on each Store, it is + * recommended to use balancePartition(int storeGroupId) to specify the storeGroupId * * @throws PDException */ @@ -1090,12 +1082,13 @@ public class PDClient { } /** - * //工作模式 - * // Auto:自动转移,达到每个Store上分区数量相同 - * // Expert:专家模式,需要指定transferParams + * Migrate partitions in manual mode + * //Working mode + * // Auto:Automatic transfer to the same number of partitions per Store + * // Expert:Expert Mode,Specifier is required transferParams * - * @param mode - * @param params + * @param params Designation transferParams, expert mode,request source store / target store + * in the same store group * @throws PDException */ public void movePartition(Pdpb.OperationMode mode, List<Pdpb.MovePartitionParam> params) throws @@ -1128,7 +1121,7 @@ public class PDClient { } /** - * 平衡不同store中leader的数量 + * Balance the number of leaders in different stores */ public void balanceLeaders() throws PDException { Pdpb.BalanceLeadersRequest request = Pdpb.BalanceLeadersRequest.newBuilder() @@ -1139,7 +1132,7 @@ public class PDClient { } /** - * 从pd中删除store + * Remove the store from the PD */ public Metapb.Store delStore(long storeId) throws PDException { Pdpb.DetStoreRequest request = Pdpb.DetStoreRequest.newBuilder() @@ -1152,7 +1145,7 @@ public class PDClient { } /** - * 对rocksdb整体进行compaction + * Compaction on rocksdb as a whole * * @throws PDException */ @@ -1166,7 +1159,7 @@ public class PDClient { } /** - * 对rocksdb指定表进行compaction + * Compaction on rocksdb specified tables * * @param tableName * @throws PDException @@ -1182,9 +1175,9 @@ public class PDClient { } /** - * 分区合并,把当前的分区缩容至toCount个 + * Merge partitions to reduce the current partition to toCount * - * @param toCount 缩容到分区的个数 + * @param toCount The number of partitions that can be scaled down * @throws PDException */ public void combineCluster(int toCount) throws PDException { @@ -1198,7 +1191,9 @@ public class PDClient { } /** - * 将单图缩容到 toCount个 + * Scaling a single image to toCount is similar to splitting to ensure that the number of + * partitions in the same store group is the same. + * If you have special requirements, you can consider migrating to other groups * * @param graphName graph name * @param toCount target count @@ -1228,7 +1223,7 @@ public class PDClient { } /** - * 用于 store的 shard list重建 + * Used for the store's shard list rebuild * * @param groupId shard group id * @param shards shard list,delete when shards size is 0 @@ -1316,7 +1311,7 @@ public class PDClient { public String nextHost() { String host = hostList.poll(); - hostList.offer(host); //移到尾部 + hostList.offer(host); return host; } diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDConfig.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDConfig.java index a1c72a2bc..822eda3d5 100644 --- a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDConfig.java +++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDConfig.java @@ -19,11 +19,13 @@ package org.apache.hugegraph.pd.client; public final class PDConfig { - //TODO multi-server + // TODO: multi-server private String serverHost = "localhost:9000"; - private long grpcTimeOut = 60000; // grpc调用超时时间 10秒 - // 是否接收PD异步通知 + // The timeout period for grpc call is 10 seconds + private long grpcTimeOut = 60000; + + // Whether to receive asynchronous PD notifications private boolean enablePDNotify = false; private boolean enableCache = false; @@ -59,8 +61,6 @@ public final class PDConfig { @Deprecated public PDConfig setEnablePDNotify(boolean enablePDNotify) { this.enablePDNotify = enablePDNotify; - - // TODO 临时代码,hugegraph修改完后删除 this.enableCache = enablePDNotify; return this; } diff --git a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDPulse.java b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDPulse.java index 485417b91..08ad7b178 100644 --- a/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDPulse.java +++ b/hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDPulse.java @@ -78,7 +78,8 @@ public interface PDPulse { Notifier<PartitionHeartbeatRequest.Builder> connectPartition(Listener<PulseResponse> listener); /** - * 切换成新的host。做 channel/host的检查,如果需要关闭,notifier调用close方法。 + * Switch to the new host. Do a channel/host check, and if you need to close, notifier calls + * the close method. * * @param host new host * @param notifier notifier
