This is an automated email from the ASF dual-hosted git repository. vgalaxies pushed a commit to branch intro-pd-service in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit d2e46bc65f2179dcff4a00e667c8d9b82c116bf8 Author: VGalaxies <[email protected]> AuthorDate: Sat Apr 20 16:00:57 2024 +0800 format hg-pd-service --- hugegraph-pd/hg-pd-service/pom.xml | 4 +- .../org/apache/hugegraph/pd/boot/HugePDServer.java | 1 + .../apache/hugegraph/pd/metrics/MetricsConfig.java | 1 + .../org/apache/hugegraph/pd/metrics/PDMetrics.java | 1 + .../org/apache/hugegraph/pd/model/DemoModel.java | 1 + .../hugegraph/pd/model/GraphRestRequest.java | 1 + .../hugegraph/pd/model/GraphSpaceRestRequest.java | 1 + .../apache/hugegraph/pd/model/PeerRestRequest.java | 1 + .../hugegraph/pd/model/PromTargetsModel.java | 1 + .../apache/hugegraph/pd/model/RestApiResponse.java | 1 + .../hugegraph/pd/model/StoreRestRequest.java | 1 + .../hugegraph/pd/model/TimeRangeRequest.java | 1 + .../hugegraph/pd/notice/NoticeBroadcaster.java | 1 + .../pd/pulse/AbstractObserverSubject.java | 9 ++- .../apache/hugegraph/pd/pulse/PDPulseSubject.java | 53 ++++++++--------- .../apache/hugegraph/pd/pulse/PulseListener.java | 1 + .../org/apache/hugegraph/pd/rest/GraphAPI.java | 6 +- .../apache/hugegraph/pd/rest/GraphSpaceAPI.java | 2 +- .../org/apache/hugegraph/pd/rest/IndexAPI.java | 5 ++ .../org/apache/hugegraph/pd/rest/MemberAPI.java | 4 +- .../org/apache/hugegraph/pd/rest/PartitionAPI.java | 9 ++- .../org/apache/hugegraph/pd/rest/ShardAPI.java | 3 +- .../org/apache/hugegraph/pd/rest/StoreAPI.java | 3 +- .../java/org/apache/hugegraph/pd/rest/TaskAPI.java | 1 + .../java/org/apache/hugegraph/pd/rest/TestAPI.java | 2 - .../hugegraph/pd/service/KvServiceGrpcImpl.java | 2 - .../hugegraph/pd/service/PDPulseService.java | 5 +- .../apache/hugegraph/pd/service/PDRestService.java | 3 +- .../org/apache/hugegraph/pd/service/PDService.java | 2 - .../hugegraph/pd/service/PromTargetsService.java | 1 - .../hugegraph/pd/upgrade/VersionScriptFactory.java | 1 + .../pd/upgrade/scripts/TaskCleanUpgrade.java | 1 + .../org/apache/hugegraph/pd/util/DateUtil.java | 1 + .../apache/hugegraph/pd/util/HgExecutorUtil.java | 4 +- .../org/apache/hugegraph/pd/util/HgMapCache.java | 3 +- .../java/org/apache/hugegraph/pd/util/IdUtil.java | 1 + .../hugegraph/pd/util/grpc/GRpcServerConfig.java | 1 + .../hugegraph/pd/watch/AbstractWatchSubject.java | 9 ++- .../apache/hugegraph/pd/watch/KvWatchSubject.java | 20 +++---- .../hugegraph/pd/watch/NodeChangeSubject.java | 10 ++-- .../apache/hugegraph/pd/watch/PDWatchSubject.java | 7 +-- .../hugegraph/pd/watch/PartitionChangeSubject.java | 10 ++-- .../hg-pd-service/src/main/resources/log4j2.xml | 68 +++++++++++----------- .../hg-pd-service/src/test/resources/log4j2.xml | 68 +++++++++++----------- 44 files changed, 183 insertions(+), 148 deletions(-) diff --git a/hugegraph-pd/hg-pd-service/pom.xml b/hugegraph-pd/hg-pd-service/pom.xml index 6acab9d7c..682daba85 100644 --- a/hugegraph-pd/hg-pd-service/pom.xml +++ b/hugegraph-pd/hg-pd-service/pom.xml @@ -128,8 +128,8 @@ <version>1.2.0</version> <exclusions> <exclusion> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-api</artifactId> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> </exclusion> </exclusions> </dependency> diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/boot/HugePDServer.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/boot/HugePDServer.java index 03032c505..e6fff5e1b 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/boot/HugePDServer.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/boot/HugePDServer.java @@ -29,6 +29,7 @@ import com.alipay.remoting.util.StringUtils; @ComponentScan(basePackages = {"org.apache.hugegraph.pd"}) @SpringBootApplication public class HugePDServer { + public static void main(String[] args) { String logPath = System.getProperty("logging.path"); if (StringUtils.isBlank(logPath)) { diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/metrics/MetricsConfig.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/metrics/MetricsConfig.java index c444cbf1b..71aead610 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/metrics/MetricsConfig.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/metrics/MetricsConfig.java @@ -26,6 +26,7 @@ import io.micrometer.core.instrument.MeterRegistry; @Configuration public class MetricsConfig { + @Autowired private PDMetrics metrics; diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/metrics/PDMetrics.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/metrics/PDMetrics.java index 3d6e74ac7..bbebfbe51 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/metrics/PDMetrics.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/metrics/PDMetrics.java @@ -34,6 +34,7 @@ import lombok.extern.slf4j.Slf4j; @Component @Slf4j public final class PDMetrics { + public static final String PREFIX = "hg"; private static final AtomicLong GRAPHS = new AtomicLong(0); private MeterRegistry registry; diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/DemoModel.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/DemoModel.java index dfb749536..fab6d7027 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/DemoModel.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/DemoModel.java @@ -20,6 +20,7 @@ package org.apache.hugegraph.pd.model; import java.util.Objects; public class DemoModel { + private int status; private String text; diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/GraphRestRequest.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/GraphRestRequest.java index 22b7aedad..809713a5b 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/GraphRestRequest.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/GraphRestRequest.java @@ -21,6 +21,7 @@ import lombok.Data; @Data public class GraphRestRequest { + private int partitionCount; private int shardCount; } diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/GraphSpaceRestRequest.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/GraphSpaceRestRequest.java index 77af6f2ad..da641681a 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/GraphSpaceRestRequest.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/GraphSpaceRestRequest.java @@ -21,5 +21,6 @@ import lombok.Data; @Data public class GraphSpaceRestRequest { + private Long storageLimit; } diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/PeerRestRequest.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/PeerRestRequest.java index 53ccc3c87..8509b8abf 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/PeerRestRequest.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/PeerRestRequest.java @@ -21,5 +21,6 @@ import lombok.Data; @Data public class PeerRestRequest { + private String peerList; } diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/PromTargetsModel.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/PromTargetsModel.java index 7531f555b..9ed16f962 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/PromTargetsModel.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/PromTargetsModel.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Set; public class PromTargetsModel { + private static final String LABEL_METRICS_PATH = "__metrics_path__"; private static final String LABEL_SCHEME = "__scheme__"; private static final String LABEL_JOB_NAME = "job"; diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/RestApiResponse.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/RestApiResponse.java index 613c87f46..8aef76bf2 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/RestApiResponse.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/RestApiResponse.java @@ -25,6 +25,7 @@ import lombok.Data; @Data public class RestApiResponse { + String message; Object data; int status; diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/StoreRestRequest.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/StoreRestRequest.java index 9283bb026..6fa8dd1f7 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/StoreRestRequest.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/StoreRestRequest.java @@ -21,5 +21,6 @@ import lombok.Data; @Data public class StoreRestRequest { + String storeState; } diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/TimeRangeRequest.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/TimeRangeRequest.java index 5f26b80f8..617c70230 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/TimeRangeRequest.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/model/TimeRangeRequest.java @@ -21,6 +21,7 @@ import lombok.Data; @Data public class TimeRangeRequest { + String startTime; String endTime; } diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/notice/NoticeBroadcaster.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/notice/NoticeBroadcaster.java index 385c9ae73..03ff2c11c 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/notice/NoticeBroadcaster.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/notice/NoticeBroadcaster.java @@ -26,6 +26,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class NoticeBroadcaster { + private final Supplier<Long> noticeSupplier; private long noticeId; private String durableId; diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/pulse/AbstractObserverSubject.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/pulse/AbstractObserverSubject.java index 029758420..431e479a5 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/pulse/AbstractObserverSubject.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/pulse/AbstractObserverSubject.java @@ -37,6 +37,7 @@ import lombok.extern.slf4j.Slf4j; @ThreadSafe @Slf4j abstract class AbstractObserverSubject { + /* send notice to client */ private final Map<Long, StreamObserver<PulseResponse>> observerHolder = new HashMap<>(1024); /* notice from client */ @@ -149,7 +150,7 @@ abstract class AbstractObserverSubject { abstract long notifyClient(com.google.protobuf.GeneratedMessageV3 response); - protected void notifyError(int code, String message){ + protected void notifyError(int code, String message) { synchronized (lock) { Iterator<Map.Entry<Long, StreamObserver<PulseResponse>>> iter = observerHolder.entrySet().iterator(); @@ -158,10 +159,12 @@ abstract class AbstractObserverSubject { Long observerId = entry.getKey(); PulseResponse res = this.builder.setObserverId(observerId).build(); try { - entry.getValue().onError(Status.fromCodeValue(code).withDescription(message).asRuntimeException()); + entry.getValue().onError(Status.fromCodeValue(code).withDescription(message) + .asRuntimeException()); } catch (Throwable e) { log.warn("Failed to send {} 's notice[{}] to observer[{}], error:{}", - this.pulseType.name(), toNoticeString(res), observerId, e.getMessage()); + this.pulseType.name(), toNoticeString(res), observerId, + e.getMessage()); } } } diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/pulse/PDPulseSubject.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/pulse/PDPulseSubject.java index 03e9fd4e9..48fe2d772 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/pulse/PDPulseSubject.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/pulse/PDPulseSubject.java @@ -60,17 +60,18 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @ThreadSafe public class PDPulseSubject { + private final static long NOTICE_EXPIRATION_TIME = 30 * 60 * 1000; private final static int RETRYING_PERIOD_SECONDS = 60; private final static Map<String, AbstractObserverSubject> subjectHolder = - new ConcurrentHashMap<>(); + new ConcurrentHashMap<>(); private final static ConcurrentLinkedQueue<NoticeBroadcaster> broadcasterQueue = - new ConcurrentLinkedQueue<>(); + new ConcurrentLinkedQueue<>(); private final static ScheduledExecutorService scheduledExecutor = - Executors.newScheduledThreadPool(1); + Executors.newScheduledThreadPool(1); private static Supplier<List<Metapb.QueueItem>> queueRetrieveFunction = - () -> Collections.emptyList(); + () -> Collections.emptyList(); private static Function<Metapb.QueueItem, Boolean> queueDurableFunction = (e) -> true; private static Function<String, Boolean> queueRemoveFunction = (e) -> true; @@ -100,15 +101,15 @@ public class PDPulseSubject { private static void appendQueue() { broadcasterQueue.addAll( - getQueueItems() - .parallelStream() - .filter(e -> !broadcasterQueue - .stream() - .anyMatch(b -> e.getItemId().equals(b.getDurableId())) - ).map(e -> createBroadcaster(e)) - .peek(e -> log.info("Appending notice: {}", e)) - .filter(e -> e != null) - .collect(Collectors.toList()) + getQueueItems() + .parallelStream() + .filter(e -> !broadcasterQueue + .stream() + .anyMatch(b -> e.getItemId().equals(b.getDurableId())) + ).map(e -> createBroadcaster(e)) + .peek(e -> log.info("Appending notice: {}", e)) + .filter(e -> e != null) + .collect(Collectors.toList()) ); } @@ -134,13 +135,13 @@ public class PDPulseSubject { } public static void setQueueRetrieveFunction( - Supplier<List<Metapb.QueueItem>> queueRetrieveFunction) { + Supplier<List<Metapb.QueueItem>> queueRetrieveFunction) { HgAssert.isArgumentNotNull(queueRetrieveFunction, "queueRetrieveFunction"); PDPulseSubject.queueRetrieveFunction = queueRetrieveFunction; } public static void setQueueDurableFunction( - Function<Metapb.QueueItem, Boolean> queueDurableFunction) { + Function<Metapb.QueueItem, Boolean> queueDurableFunction) { HgAssert.isArgumentNotNull(queueDurableFunction, "queueDurableFunction"); PDPulseSubject.queueDurableFunction = queueDurableFunction; } @@ -157,7 +158,7 @@ public class PDPulseSubject { * @return */ public static StreamObserver<PulseRequest> addObserver( - StreamObserver<PulseResponse> responseObserver) { + StreamObserver<PulseResponse> responseObserver) { isArgumentNotNull(responseObserver, "responseObserver"); return new PDPulseStreamObserver(responseObserver); } @@ -219,7 +220,7 @@ public class PDPulseSubject { // } public static <T extends com.google.protobuf.GeneratedMessageV3> Supplier<Long> getNoticeSupplier( - T notice) { + T notice) { PulseType type; if (notice instanceof PdInstructionResponse) { type = PulseType.PULSE_TYPE_PD_INSTRUCTION; @@ -231,9 +232,8 @@ public class PDPulseSubject { return () -> getSubject(type).notifyClient(notice); } - private static Supplier<String> getDurableSupplier( - com.google.protobuf.GeneratedMessageV3 notice) { + com.google.protobuf.GeneratedMessageV3 notice) { return () -> { Metapb.QueueItem queueItem = toQueueItem(notice); String res = null; @@ -243,9 +243,9 @@ public class PDPulseSubject { res = queueItem.getItemId(); } else { log.error( - "Failed to persist queue-item that contained " + - "PartitionHeartbeatResponse: {}" - , notice); + "Failed to persist queue-item that contained " + + "PartitionHeartbeatResponse: {}" + , notice); } } catch (Throwable t) { log.error("Failed to invoke queueDurableFunction, cause by:", t); @@ -317,6 +317,7 @@ public class PDPulseSubject { /* inner classes below */ private static class PDPulseStreamObserver implements StreamObserver<PulseRequest> { + private final StreamObserver<PulseResponse> responseObserver; private AbstractObserverSubject subject; private Long observerId; @@ -329,7 +330,7 @@ public class PDPulseSubject { if (this.subject == null) { this.responseObserver.onError( - new Exception("Invoke cancel-observer before create-observer.")); + new Exception("Invoke cancel-observer before create-observer.")); return; } @@ -374,7 +375,7 @@ public class PDPulseSubject { if (subject == null) { responseObserver.onError( - new Exception("Unsupported pulse-type: " + pulseType.name())); + new Exception("Unsupported pulse-type: " + pulseType.name())); return null; } @@ -392,7 +393,7 @@ public class PDPulseSubject { log.info("send change leader command to watch, due to ERROR-100", pde); notifyClient(PdInstructionResponse.newBuilder() .setInstructionType( - PdInstructionType.CHANGE_TO_FOLLOWER) + PdInstructionType.CHANGE_TO_FOLLOWER) .setLeaderIp(RaftEngine.getInstance() .getLeaderGrpcAddress()) .build()); @@ -425,7 +426,7 @@ public class PDPulseSubject { if (pulseRequest.hasAckRequest()) { this.ackNotice(pulseRequest.getAckRequest().getNoticeId() - , pulseRequest.getAckRequest().getObserverId()); + , pulseRequest.getAckRequest().getObserverId()); } } diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/pulse/PulseListener.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/pulse/PulseListener.java index 09e0a7830..8a247257f 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/pulse/PulseListener.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/pulse/PulseListener.java @@ -18,6 +18,7 @@ package org.apache.hugegraph.pd.pulse; public interface PulseListener<T> { + /** * Invoked on new notice. * diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/GraphAPI.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/GraphAPI.java index 9b3eb7662..68d80beb4 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/GraphAPI.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/GraphAPI.java @@ -50,6 +50,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @RequestMapping("/v1") public class GraphAPI extends API { + @Autowired PDRestService pdRestService; @Autowired @@ -125,7 +126,6 @@ public class GraphAPI extends API { } } - @GetMapping(value = "/graph/**", produces = MediaType.APPLICATION_JSON_VALUE) @ResponseBody public RestApiResponse getGraph(HttpServletRequest request) throws @@ -158,6 +158,7 @@ public class GraphAPI extends API { @Data class Shard { + long partitionId; long storeId; String state; @@ -184,6 +185,7 @@ public class GraphAPI extends API { @Data class Partition { + int partitionId; String graphName; String workState; @@ -227,13 +229,13 @@ public class GraphAPI extends API { this.shards = shardsList; } - } } } @Data class GraphStatistics { + //图统计信息 String graphName; long partitionCount; diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/GraphSpaceAPI.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/GraphSpaceAPI.java index c47e1869d..388f842e7 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/GraphSpaceAPI.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/GraphSpaceAPI.java @@ -42,6 +42,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @RequestMapping("/v1") public class GraphSpaceAPI extends API { + @Autowired PDRestService pdRestService; @@ -95,5 +96,4 @@ public class GraphSpaceAPI extends API { } } - } diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/IndexAPI.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/IndexAPI.java index 1ee2d3712..89f6e8624 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/IndexAPI.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/IndexAPI.java @@ -44,6 +44,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @RequestMapping("/") public class IndexAPI extends API { + @Autowired PDService pdService; @Autowired @@ -132,6 +133,7 @@ public class IndexAPI extends API { @Data class BriefStatistics { + String state; String leader; int memberSize; @@ -142,6 +144,7 @@ public class IndexAPI extends API { @Data class Store { + long storeId; String address; String raftAddress; @@ -164,6 +167,7 @@ public class IndexAPI extends API { @Data class Member { + String raftUrl; String grpcUrl; String restUrl; @@ -194,6 +198,7 @@ public class IndexAPI extends API { @Data class Statistics { + /** * 集群状态 */ diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/MemberAPI.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/MemberAPI.java index 28c9fdb98..b28b20a58 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/MemberAPI.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/MemberAPI.java @@ -55,6 +55,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @RequestMapping("/v1") public class MemberAPI extends API { + //TODO @Autowired PDService pdService; @@ -130,9 +131,9 @@ public class MemberAPI extends API { } } - public static class CallStreamObserverWrap<V> extends CallStreamObserver<V> implements Future<List<V>> { + CompletableFuture<List<V>> future = new CompletableFuture<>(); List<V> values = new ArrayList<>(); @@ -206,6 +207,7 @@ public class MemberAPI extends API { @Data class Member { + String raftUrl; String grpcUrl; String restUrl; diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/PartitionAPI.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/PartitionAPI.java index cdd3ac0ae..bdbdec39d 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/PartitionAPI.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/PartitionAPI.java @@ -51,6 +51,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @RequestMapping("/v1") public class PartitionAPI extends API { + public static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss"; @Autowired PDRestService pdRestService; @@ -240,7 +241,6 @@ public class PartitionAPI extends API { role = finalShardStats.get(shard.getStoreId()).getRole(); } - HashMap<Integer, Metapb.RaftStats> storeRaftStats = raftMap.get(shard.getStoreId()); if (storeRaftStats != null) { @@ -338,6 +338,7 @@ public class PartitionAPI extends API { @Data class Shard { + String address; String storeId; Metapb.ShardRole role; @@ -350,6 +351,7 @@ public class PartitionAPI extends API { @Data class Partition { + int id; long version; String graphName; @@ -360,7 +362,6 @@ public class PartitionAPI extends API { List<Shard> shards; String timestamp; - Partition(Metapb.Partition pt) { id = pt.getId(); version = pt.getVersion(); @@ -379,11 +380,11 @@ public class PartitionAPI extends API { @Data class Statistics { - } @Data class HighLevelPartition { + int partitionId; String state; String leaderAddress; @@ -430,6 +431,7 @@ public class PartitionAPI extends API { @Data class GraphStats { + String graphName; long keyCount; long startKey; @@ -449,6 +451,7 @@ public class PartitionAPI extends API { @Data class ShardStats { + long storeId; String role; String state; diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/ShardAPI.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/ShardAPI.java index 685d8fa3a..6cb5b09da 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/ShardAPI.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/ShardAPI.java @@ -17,7 +17,6 @@ package org.apache.hugegraph.pd.rest; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -42,6 +41,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @RequestMapping("/v1") public class ShardAPI extends API { + @Autowired PDRestService pdRestService; @Autowired @@ -109,6 +109,7 @@ public class ShardAPI extends API { @Data class Shard { + long storeId; long partitionId; String role; diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/StoreAPI.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/StoreAPI.java index 258727774..030d5de46 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/StoreAPI.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/StoreAPI.java @@ -181,7 +181,6 @@ public class StoreAPI extends API { } } - @GetMapping(value = "store/{storeId}", produces = MediaType.APPLICATION_JSON_VALUE) @ResponseBody public RestApiResponse getStore(@PathVariable long storeId) { @@ -238,6 +237,7 @@ public class StoreAPI extends API { @Data class Partition { + //分区信息 int partitionId; String graphName; @@ -261,6 +261,7 @@ public class StoreAPI extends API { @Data class StoreStatistics { + //store的统计信息 long storeId; String address; diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/TaskAPI.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/TaskAPI.java index 06b8f0f8b..a1876141b 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/TaskAPI.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/TaskAPI.java @@ -37,6 +37,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @RequestMapping("/v1/task") public class TaskAPI extends API { + @Autowired PDRestService pdRestService; diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/TestAPI.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/TestAPI.java index f15be9b48..965cc83cd 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/TestAPI.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/rest/TestAPI.java @@ -130,7 +130,6 @@ public class TestAPI { .setItemClass("item-class") .setItemContent(response.toByteString()); - QueueStore store = MetadataFactory.newQueueStore(pdConfig); try { @@ -158,6 +157,5 @@ public class TestAPI { PDPulseSubject.notifyClient(PartitionHeartbeatResponse.newBuilder(buf)); }); - } } diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/KvServiceGrpcImpl.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/KvServiceGrpcImpl.java index 99c58937b..ffa8cdaab 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/KvServiceGrpcImpl.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/KvServiceGrpcImpl.java @@ -357,7 +357,6 @@ public class KvServiceGrpcImpl extends KvServiceGrpc.KvServiceImplBase implement } - /** * 加锁 * @@ -484,7 +483,6 @@ public class KvServiceGrpcImpl extends KvServiceGrpc.KvServiceImplBase implement responseObserver.onCompleted(); } - /** * 锁续活 * diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDPulseService.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDPulseService.java index 83a1e1416..04db6ae35 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDPulseService.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDPulseService.java @@ -42,8 +42,9 @@ import lombok.extern.slf4j.Slf4j; public class PDPulseService extends HgPdPulseGrpc.HgPdPulseImplBase { private static final Supplier<List<Metapb.QueueItem>> QUEUE_RETRIEVE_FUNCTION = - () -> Collections.emptyList(); - private static final Function<Metapb.QueueItem, Boolean> QUEUE_ITEM_BOOLEAN_FUNCTION = (e) -> true; + () -> Collections.emptyList(); + private static final Function<Metapb.QueueItem, Boolean> QUEUE_ITEM_BOOLEAN_FUNCTION = + (e) -> true; private static final Function<String, Boolean> QUEUE_REMOVE_FUNCTION = (e) -> true; @Autowired private PDConfig pdConfig; diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDRestService.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDRestService.java index 76ee4227b..ed902208c 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDRestService.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDRestService.java @@ -50,6 +50,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @Service public class PDRestService implements InitializingBean { + private static final String EMPTY_STRING = ""; @Autowired PDService pdService; @@ -79,7 +80,6 @@ public class PDRestService implements InitializingBean { HgAssert.isNotNull(partitionService, "partitionService does not initialize"); } - public List<Metapb.Store> getStores(String graphName) throws PDException { return storeNodeService.getStores(graphName); } @@ -253,7 +253,6 @@ public class PDRestService implements InitializingBean { return logService.getLog(LogService.NODE_CHANGE, start, end); } - public List<Metapb.LogRecord> getPartitionLog(Long start, Long end) throws PDException { return logService.getLog(LogService.PARTITION_CHANGE, start, end); } diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDService.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDService.java index 4ef740137..c9729bb52 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDService.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDService.java @@ -191,7 +191,6 @@ public class PDService extends PDGrpc.PDImplBase implements ServiceGrpc, RaftSta } }); - /** * 监听分区指令,并转发给 Store */ @@ -1760,7 +1759,6 @@ public class PDService extends PDGrpc.PDImplBase implements ServiceGrpc, RaftSta observer.onCompleted(); } - private List<KVPair<String, PeerId>> parseConfig(String conf) { List<KVPair<String, PeerId>> result = new LinkedList<>(); diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PromTargetsService.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PromTargetsService.java index 5f163e17d..a5e6dc863 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PromTargetsService.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PromTargetsService.java @@ -175,7 +175,6 @@ public class PromTargetsService { } }); - return model; }) .filter(e -> e != null) diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/upgrade/VersionScriptFactory.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/upgrade/VersionScriptFactory.java index 5e01952e5..bfee49d4e 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/upgrade/VersionScriptFactory.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/upgrade/VersionScriptFactory.java @@ -24,6 +24,7 @@ import org.apache.hugegraph.pd.upgrade.scripts.PartitionMetaUpgrade; import org.apache.hugegraph.pd.upgrade.scripts.TaskCleanUpgrade; public class VersionScriptFactory { + private static final List<VersionUpgradeScript> SCRIPTS = new LinkedList<>(); private static volatile VersionScriptFactory factory; diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/upgrade/scripts/TaskCleanUpgrade.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/upgrade/scripts/TaskCleanUpgrade.java index bc302e31e..a876c0eb7 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/upgrade/scripts/TaskCleanUpgrade.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/upgrade/scripts/TaskCleanUpgrade.java @@ -27,6 +27,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class TaskCleanUpgrade implements VersionUpgradeScript { + @Override public String getHighVersion() { return UNLIMITED_VERSION; diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/util/DateUtil.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/util/DateUtil.java index 729e8c89f..94704dde8 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/util/DateUtil.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/util/DateUtil.java @@ -25,6 +25,7 @@ import org.apache.hugegraph.pd.common.PDException; import org.apache.hugegraph.pd.grpc.Pdpb; public class DateUtil { + private static final String DATE = "yyyy-MM-dd"; private static final String DATETIME = "yyyy-MM-dd HH:mm:ss"; private static final String DATETIME_MM = "yyyy-MM-dd HH:mm"; diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/util/HgExecutorUtil.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/util/HgExecutorUtil.java index 26e7d9742..caf7f48ab 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/util/HgExecutorUtil.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/util/HgExecutorUtil.java @@ -32,9 +32,9 @@ import org.apache.hugegraph.pd.common.HgAssert; import lombok.extern.slf4j.Slf4j; - @Slf4j public final class HgExecutorUtil { + private static final Map<String, ThreadPoolExecutor> EXECUTOR_MAP = new ConcurrentHashMap<>(); private static final Executor COMMON_EXECUTOR = new ThreadPoolExecutor(0, Integer.MAX_VALUE, @@ -123,6 +123,7 @@ public final class HgExecutorUtil { * The default thread factory */ static class HgThreadFactory implements ThreadFactory { + private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; private final int priority; @@ -152,6 +153,7 @@ public final class HgExecutorUtil { * The default thread factory, which added threadNamePrefix in construction method. */ static class HgDefaultThreadFactory implements ThreadFactory { + private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1); private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/util/HgMapCache.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/util/HgMapCache.java index fe84bb4b0..e3187912b 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/util/HgMapCache.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/util/HgMapCache.java @@ -26,6 +26,7 @@ import java.util.function.Supplier; * @param <V> */ public class HgMapCache<K, V> { + private final Map<K, V> cache = new ConcurrentHashMap<K, V>(); private final Supplier<Boolean> expiry; @@ -52,7 +53,6 @@ public class HgMapCache<K, V> { this.cache.put(key, value); } - public V get(K key) { if (isExpired()) { return null; @@ -77,6 +77,7 @@ public class HgMapCache<K, V> { } private static class CycleIntervalPolicy implements Supplier<Boolean> { + private long expireTime = 0; private long interval = 0; diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/util/IdUtil.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/util/IdUtil.java index 6ecba60e1..ed05aceb0 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/util/IdUtil.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/util/IdUtil.java @@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public final class IdUtil { + private static final byte[] LOCK = new byte[0]; public static String createMillisStr() { diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/util/grpc/GRpcServerConfig.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/util/grpc/GRpcServerConfig.java index 83c198a80..be8f98e47 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/util/grpc/GRpcServerConfig.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/util/grpc/GRpcServerConfig.java @@ -27,6 +27,7 @@ import io.grpc.ServerBuilder; @Component public class GRpcServerConfig extends GRpcServerBuilderConfigurer { + public static final String EXECUTOR_NAME = "hg-grpc"; @Autowired private PDConfig pdConfig; diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/watch/AbstractWatchSubject.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/watch/AbstractWatchSubject.java index bf25a578b..3e2f0b538 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/watch/AbstractWatchSubject.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/watch/AbstractWatchSubject.java @@ -36,6 +36,7 @@ import lombok.extern.slf4j.Slf4j; @ThreadSafe @Slf4j abstract class AbstractWatchSubject { + private final Map<Long, StreamObserver<WatchResponse>> watcherHolder = new HashMap<>(1024); private final byte[] lock = new byte[0]; private final WatchResponse.Builder builder = WatchResponse.newBuilder(); @@ -74,7 +75,7 @@ abstract class AbstractWatchSubject { abstract String toNoticeString(WatchResponse res); - public void notifyError(int code, String message){ + public void notifyError(int code, String message) { synchronized (lock) { Iterator<Map.Entry<Long, StreamObserver<WatchResponse>>> iter = watcherHolder.entrySet().iterator(); @@ -83,9 +84,11 @@ abstract class AbstractWatchSubject { Long watcherId = entry.getKey(); WatchResponse res = this.builder.setWatcherId(watcherId).build(); try { - entry.getValue().onError(Status.fromCodeValue(code).withDescription(message).asRuntimeException()); + entry.getValue().onError(Status.fromCodeValue(code).withDescription(message) + .asRuntimeException()); } catch (Throwable e) { - // log.error("Failed to send " + this.watchType.name() + "'s error message [" + toNoticeString(res) + // log.error("Failed to send " + this.watchType.name() + "'s error message [" + // + toNoticeString(res) // + "] to watcher[" + watcherId + "].", e); } } diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/watch/KvWatchSubject.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/watch/KvWatchSubject.java index c70016a31..f0109a623 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/watch/KvWatchSubject.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/watch/KvWatchSubject.java @@ -51,7 +51,7 @@ public class KvWatchSubject { public static final String ALL_PREFIX = "W"; public static final long WATCH_TTL = 20000L; private static final ConcurrentMap<String, StreamObserver<WatchResponse>> clients = - new ConcurrentHashMap<>(); + new ConcurrentHashMap<>(); private final KvService kvService; BiPredicate<String, String> equal = String::equals; BiPredicate<String, String> startWith = String::startsWith; @@ -74,7 +74,7 @@ public class KvWatchSubject { String watchKey = KvService.getKeyWithoutPrefix(ALL_PREFIX, delimiter, key, clientId); kvService.put(watchKey, "", WATCH_TTL); String clientFirstKey = - KvService.getKeyWithoutPrefix(ALL_PREFIX, clientId, delimiter, key, clientId); + KvService.getKeyWithoutPrefix(ALL_PREFIX, clientId, delimiter, key, clientId); kvService.put(clientFirstKey, "", WATCH_TTL); } @@ -97,7 +97,7 @@ public class KvWatchSubject { public void addObserver(String key, long clientId, StreamObserver<WatchResponse> observer, String delimiter) throws PDException { String keyWithoutPrefix = - KvService.getKeyWithoutPrefix(ALL_PREFIX, delimiter, key, clientId); + KvService.getKeyWithoutPrefix(ALL_PREFIX, delimiter, key, clientId); clients.putIfAbsent(keyWithoutPrefix, observer); addWatchKey(key, delimiter, clientId); log.info("client:{},start to watch key:{}", clientId, key); @@ -106,7 +106,7 @@ public class KvWatchSubject { public void removeObserver(String key, long clientId, String delimiter) throws PDException { removeWatchKey(key, delimiter, clientId); String keyWithoutPrefix = - KvService.getKeyWithoutPrefix(ALL_PREFIX, delimiter, key, clientId); + KvService.getKeyWithoutPrefix(ALL_PREFIX, delimiter, key, clientId); clients.remove(keyWithoutPrefix); } @@ -143,15 +143,15 @@ public class KvWatchSubject { continue; } WatchKv watchKv = - WatchKv.newBuilder().setKey(kvKey).setValue(kv.getValue()).build(); + WatchKv.newBuilder().setKey(kvKey).setValue(kv.getValue()).build(); WatchEvent event = - WatchEvent.newBuilder().setCurrent(watchKv).setType(watchType).build(); + WatchEvent.newBuilder().setCurrent(watchKv).setType(watchType).build(); watchEvents.add(event); } StreamObserver<WatchResponse> observer = clients.get(keyAndClient); watchResponse = - WatchResponse.newBuilder().setState(WatchState.Started).setClientId(clientId) - .addAllEvents(watchEvents).build(); + WatchResponse.newBuilder().setState(WatchState.Started).setClientId(clientId) + .addAllEvents(watchEvents).build(); try { if (observer != null) { @@ -186,7 +186,7 @@ public class KvWatchSubject { WatchResponse testAlive = WatchResponse.newBuilder().setState(WatchState.Alive).build(); Set<Map.Entry<String, StreamObserver<WatchResponse>>> entries = clients.entrySet(); Map.Entry<String, StreamObserver<WatchResponse>>[] array = - entries.toArray(new Map.Entry[0]); + entries.toArray(new Map.Entry[0]); Arrays.stream(array).parallel().forEach(entry -> { StreamObserver<WatchResponse> value = entry.getValue(); String key = entry.getKey(); @@ -260,7 +260,7 @@ public class KvWatchSubject { */ public void notifyClientChangeLeader() { WatchResponse response = - WatchResponse.newBuilder().setState(WatchState.Leader_Changed).build(); + WatchResponse.newBuilder().setState(WatchState.Leader_Changed).build(); for (Map.Entry<String, StreamObserver<WatchResponse>> entry : clients.entrySet()) { StreamObserver<WatchResponse> value = entry.getValue(); String key = entry.getKey(); diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/watch/NodeChangeSubject.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/watch/NodeChangeSubject.java index 3036c0023..5ef1deee6 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/watch/NodeChangeSubject.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/watch/NodeChangeSubject.java @@ -48,11 +48,11 @@ final class NodeChangeSubject extends AbstractWatchSubject { super.notifyWatcher(builder -> { builder.setNodeResponse( - builder.getNodeResponseBuilder().clear() - .setGraph(graph) - .setNodeId(nodeId) - .setNodeEventType(nodeEventType) - .build() + builder.getNodeResponseBuilder().clear() + .setGraph(graph) + .setNodeId(nodeId) + .setNodeEventType(nodeEventType) + .build() ); }); diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/watch/PDWatchSubject.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/watch/PDWatchSubject.java index c1a429eb8..3b1437221 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/watch/PDWatchSubject.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/watch/PDWatchSubject.java @@ -38,6 +38,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @ThreadSafe public class PDWatchSubject implements StreamObserver<WatchRequest> { + public final static Map<String, AbstractWatchSubject> subjectHolder = new ConcurrentHashMap<>(); private final static byte[] lock = new byte[0]; @@ -101,8 +102,8 @@ public class PDWatchSubject implements StreamObserver<WatchRequest> { subjectHolder.get(type.name()).notifyWatcher(builder); } - public static void notifyError(int code, String message){ - subjectHolder.forEach((k, v)->{ + public static void notifyError(int code, String message) { + subjectHolder.forEach((k, v) -> { v.notifyError(code, message); }); } @@ -132,7 +133,6 @@ public class PDWatchSubject implements StreamObserver<WatchRequest> { this.subject.removeObserver(this.watcherId, this.responseObserver); } - private WatchType getWatchType(WatchCreateRequest request) { WatchType watchType = request.getWatchType(); @@ -155,7 +155,6 @@ public class PDWatchSubject implements StreamObserver<WatchRequest> { return subject; } - private void addWatcher(WatchCreateRequest request) { if (this.subject != null) { return; diff --git a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/watch/PartitionChangeSubject.java b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/watch/PartitionChangeSubject.java index 43f5bc9b7..c7db46e8e 100644 --- a/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/watch/PartitionChangeSubject.java +++ b/hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/watch/PartitionChangeSubject.java @@ -50,11 +50,11 @@ final class PartitionChangeSubject extends AbstractWatchSubject { super.notifyWatcher(builder -> { builder.setPartitionResponse( - builder.getPartitionResponseBuilder().clear() - .setGraph(graph) - .setPartitionId(partitionId) - .setChangeType(changeType) - .build() + builder.getPartitionResponseBuilder().clear() + .setGraph(graph) + .setPartitionId(partitionId) + .setChangeType(changeType) + .build() ); }); diff --git a/hugegraph-pd/hg-pd-service/src/main/resources/log4j2.xml b/hugegraph-pd/hg-pd-service/src/main/resources/log4j2.xml index f3cd7c005..228f4d038 100644 --- a/hugegraph-pd/hg-pd-service/src/main/resources/log4j2.xml +++ b/hugegraph-pd/hg-pd-service/src/main/resources/log4j2.xml @@ -26,32 +26,32 @@ <appenders> <Console name="console" target="SYSTEM_OUT"> - <ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY"/> - <PatternLayout pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%p] %c{1.} - %m%n"/> + <ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY" /> + <PatternLayout pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%p] %c{1.} - %m%n" /> </Console> <!-- Normal server log config --> <RollingRandomAccessFile name="file" fileName="${LOG_PATH}/${FILE_NAME}.log" filePattern="${LOG_PATH}/$${date:yyyy-MM}/${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" bufferedIO="true" bufferSize="524288" immediateFlush="false"> - <ThresholdFilter level="TRACE" onMatch="ACCEPT" onMismatch="DENY"/> - <PatternLayout pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%p] %c{1.} - %m%n"/> + <ThresholdFilter level="TRACE" onMatch="ACCEPT" onMismatch="DENY" /> + <PatternLayout pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%p] %c{1.} - %m%n" /> <!--JsonLayout compact="true" eventEol="true" complete="true" locationInfo="true"> <KeyValuePair key="timestamp" value="$${date:yyyy-MM-dd HH:mm:ss.SSS}"/> </JsonLayout--> <!-- Trigger after exceeding 1day or 50MB --> <Policies> - <SizeBasedTriggeringPolicy size="128MB"/> - <TimeBasedTriggeringPolicy interval="1" modulate="true"/> + <SizeBasedTriggeringPolicy size="128MB" /> + <TimeBasedTriggeringPolicy interval="1" modulate="true" /> </Policies> <!-- Keep 5 files per day & auto delete after over 2GB or 100 files --> <DefaultRolloverStrategy max="16"> <Delete basePath="${LOG_PATH}" maxDepth="2"> - <IfFileName glob="*/*.log"/> + <IfFileName glob="*/*.log" /> <!-- Limit log amount & size --> <IfAny> - <IfAccumulatedFileSize exceeds="2GB"/> - <IfAccumulatedFileCount exceeds="100"/> + <IfAccumulatedFileSize exceeds="2GB" /> + <IfAccumulatedFileCount exceeds="100" /> </IfAny> </Delete> </DefaultRolloverStrategy> @@ -61,24 +61,24 @@ <RollingRandomAccessFile name="raft_file" fileName="${LOG_PATH}/${FILE_NAME}_raft.log" filePattern="${LOG_PATH}/$${date:yyyy-MM}/${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" bufferedIO="true" bufferSize="524288" immediateFlush="false"> - <ThresholdFilter level="TRACE" onMatch="ACCEPT" onMismatch="DENY"/> - <PatternLayout pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%p] %c{1.} - %m%n"/> + <ThresholdFilter level="TRACE" onMatch="ACCEPT" onMismatch="DENY" /> + <PatternLayout pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%p] %c{1.} - %m%n" /> <!--JsonLayout compact="true" eventEol="true" complete="true" locationInfo="true"> <KeyValuePair key="timestamp" value="$${date:yyyy-MM-dd HH:mm:ss.SSS}"/> </JsonLayout--> <!-- Trigger after exceeding 1day or 50MB --> <Policies> - <SizeBasedTriggeringPolicy size="128MB"/> - <TimeBasedTriggeringPolicy interval="1" modulate="true"/> + <SizeBasedTriggeringPolicy size="128MB" /> + <TimeBasedTriggeringPolicy interval="1" modulate="true" /> </Policies> <!-- Keep 5 files per day & auto Delete after over 2GB or 100 files --> <DefaultRolloverStrategy max="16"> <Delete basePath="${LOG_PATH}" maxDepth="2"> - <IfFileName glob="*/*.log"/> + <IfFileName glob="*/*.log" /> <!-- Limit log amount & size --> <IfAny> - <IfAccumulatedFileSize exceeds="2GB"/> - <IfAccumulatedFileCount exceeds="100"/> + <IfAccumulatedFileSize exceeds="2GB" /> + <IfAccumulatedFileCount exceeds="100" /> </IfAny> </Delete> </DefaultRolloverStrategy> @@ -88,25 +88,25 @@ <RollingRandomAccessFile name="audit" fileName="${LOG_PATH}/audit-${FILE_NAME}.log" filePattern="${LOG_PATH}/$${date:yyyy-MM}/audit-${FILE_NAME}-%d{yyyy-MM-dd-HH}-%i.gz" bufferedIO="true" bufferSize="524288" immediateFlush="false"> - <ThresholdFilter level="TRACE" onMatch="ACCEPT" onMismatch="DENY"/> + <ThresholdFilter level="TRACE" onMatch="ACCEPT" onMismatch="DENY" /> <!-- Use simple format for audit log to speed up --> <!-- PatternLayout pattern="%-d{yyyy-MM-dd HH:mm:ss} - %m%n"/ --> <JsonLayout compact="true" eventEol="true" locationInfo="true"> - <KeyValuePair key="timestamp" value="$${date:yyyy-MM-dd HH:mm:ss.SSS}"/> + <KeyValuePair key="timestamp" value="$${date:yyyy-MM-dd HH:mm:ss.SSS}" /> </JsonLayout> <!-- Trigger after exceeding 1hour or 500MB --> <Policies> - <SizeBasedTriggeringPolicy size="512MB"/> - <TimeBasedTriggeringPolicy interval="1" modulate="true"/> + <SizeBasedTriggeringPolicy size="512MB" /> + <TimeBasedTriggeringPolicy interval="1" modulate="true" /> </Policies> <!-- Keep 2 files per hour & auto delete [after 60 days] or [over 5GB or 500 files] --> <DefaultRolloverStrategy max="16"> <Delete basePath="${LOG_PATH}" maxDepth="2"> - <IfFileName glob="*/*.gz"/> - <IfLastModified age="60d"/> + <IfFileName glob="*/*.gz" /> + <IfLastModified age="60d" /> <IfAny> - <IfAccumulatedFileSize exceeds="5GB"/> - <IfAccumulatedFileCount exceeds="500"/> + <IfAccumulatedFileSize exceeds="5GB" /> + <IfAccumulatedFileCount exceeds="500" /> </IfAny> </Delete> </DefaultRolloverStrategy> @@ -115,25 +115,25 @@ <loggers> <root level="INFO"> - <appender-ref ref="file"/> - <appender-ref ref="console"/> + <appender-ref ref="file" /> + <appender-ref ref="console" /> </root> <logger name="com.alipay.sofa" level="INFO" additivity="false"> - <appender-ref ref="raft_file"/> - <appender-ref ref="console"/> + <appender-ref ref="raft_file" /> + <appender-ref ref="console" /> </logger> <logger name="io.netty" level="INFO" additivity="false"> - <appender-ref ref="file"/> - <appender-ref ref="console"/> + <appender-ref ref="file" /> + <appender-ref ref="console" /> </logger> <logger name="org.apache.commons" level="INFO" additivity="false"> - <appender-ref ref="file"/> - <appender-ref ref="console"/> + <appender-ref ref="file" /> + <appender-ref ref="console" /> </logger> <!-- Use mixed async way to output logs --> <AsyncLogger name="org.apache.hugegraph" level="INFO" additivity="false"> - <appender-ref ref="file"/> - <appender-ref ref="console"/> + <appender-ref ref="file" /> + <appender-ref ref="console" /> </AsyncLogger> </loggers> </configuration> diff --git a/hugegraph-pd/hg-pd-service/src/test/resources/log4j2.xml b/hugegraph-pd/hg-pd-service/src/test/resources/log4j2.xml index e51bcb30a..ea7d9a076 100644 --- a/hugegraph-pd/hg-pd-service/src/test/resources/log4j2.xml +++ b/hugegraph-pd/hg-pd-service/src/test/resources/log4j2.xml @@ -26,32 +26,32 @@ <appenders> <Console name="console" target="SYSTEM_OUT"> - <ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY"/> - <PatternLayout pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%p] %c{1.} - %m%n"/> + <ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY" /> + <PatternLayout pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%p] %c{1.} - %m%n" /> </Console> <!-- Normal server log config --> <RollingRandomAccessFile name="file" fileName="${LOG_PATH}/${FILE_NAME}.log" filePattern="${LOG_PATH}/$${date:yyyy-MM}/${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" bufferedIO="true" bufferSize="524288" immediateFlush="false"> - <ThresholdFilter level="TRACE" onMatch="ACCEPT" onMismatch="DENY"/> - <PatternLayout pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%p] %c{1.} - %m%n"/> + <ThresholdFilter level="TRACE" onMatch="ACCEPT" onMismatch="DENY" /> + <PatternLayout pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%p] %c{1.} - %m%n" /> <!--JsonLayout compact="true" eventEol="true" complete="true" locationInfo="true"> <KeyValuePair key="timestamp" value="$${date:yyyy-MM-dd HH:mm:ss.SSS}"/> </JsonLayout--> <!-- Trigger after exceeding 1day or 50MB --> <Policies> - <SizeBasedTriggeringPolicy size="128MB"/> - <TimeBasedTriggeringPolicy interval="1" modulate="true"/> + <SizeBasedTriggeringPolicy size="128MB" /> + <TimeBasedTriggeringPolicy interval="1" modulate="true" /> </Policies> <!-- Keep 5 files per day & auto delete after over 2GB or 100 files --> <DefaultRolloverStrategy max="16"> <Delete basePath="${LOG_PATH}" maxDepth="2"> - <IfFileName glob="*/*.log"/> + <IfFileName glob="*/*.log" /> <!-- Limit log amount & size --> <IfAny> - <IfAccumulatedFileSize exceeds="2GB"/> - <IfAccumulatedFileCount exceeds="100"/> + <IfAccumulatedFileSize exceeds="2GB" /> + <IfAccumulatedFileCount exceeds="100" /> </IfAny> </Delete> </DefaultRolloverStrategy> @@ -61,24 +61,24 @@ <RollingRandomAccessFile name="raft_file" fileName="${LOG_PATH}/${FILE_NAME}_raft.log" filePattern="${LOG_PATH}/$${date:yyyy-MM}/${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" bufferedIO="true" bufferSize="524288" immediateFlush="false"> - <ThresholdFilter level="TRACE" onMatch="ACCEPT" onMismatch="DENY"/> - <PatternLayout pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%p] %c{1.} - %m%n"/> + <ThresholdFilter level="TRACE" onMatch="ACCEPT" onMismatch="DENY" /> + <PatternLayout pattern="%-d{yyyy-MM-dd HH:mm:ss} [%t] [%p] %c{1.} - %m%n" /> <!--JsonLayout compact="true" eventEol="true" complete="true" locationInfo="true"> <KeyValuePair key="timestamp" value="$${date:yyyy-MM-dd HH:mm:ss.SSS}"/> </JsonLayout--> <!-- Trigger after exceeding 1day or 50MB --> <Policies> - <SizeBasedTriggeringPolicy size="128MB"/> - <TimeBasedTriggeringPolicy interval="1" modulate="true"/> + <SizeBasedTriggeringPolicy size="128MB" /> + <TimeBasedTriggeringPolicy interval="1" modulate="true" /> </Policies> <!-- Keep 5 files per day & auto Delete after over 2GB or 100 files --> <DefaultRolloverStrategy max="16"> <Delete basePath="${LOG_PATH}" maxDepth="2"> - <IfFileName glob="*/*.log"/> + <IfFileName glob="*/*.log" /> <!-- Limit log amount & size --> <IfAny> - <IfAccumulatedFileSize exceeds="2GB"/> - <IfAccumulatedFileCount exceeds="100"/> + <IfAccumulatedFileSize exceeds="2GB" /> + <IfAccumulatedFileCount exceeds="100" /> </IfAny> </Delete> </DefaultRolloverStrategy> @@ -88,25 +88,25 @@ <RollingRandomAccessFile name="audit" fileName="${LOG_PATH}/audit-${FILE_NAME}.log" filePattern="${LOG_PATH}/$${date:yyyy-MM}/audit-${FILE_NAME}-%d{yyyy-MM-dd-HH}-%i.gz" bufferedIO="true" bufferSize="524288" immediateFlush="false"> - <ThresholdFilter level="TRACE" onMatch="ACCEPT" onMismatch="DENY"/> + <ThresholdFilter level="TRACE" onMatch="ACCEPT" onMismatch="DENY" /> <!-- Use simple format for audit log to speed up --> <!-- PatternLayout pattern="%-d{yyyy-MM-dd HH:mm:ss} - %m%n"/ --> <JsonLayout compact="true" eventEol="true" locationInfo="true"> - <KeyValuePair key="timestamp" value="$${date:yyyy-MM-dd HH:mm:ss.SSS}"/> + <KeyValuePair key="timestamp" value="$${date:yyyy-MM-dd HH:mm:ss.SSS}" /> </JsonLayout> <!-- Trigger after exceeding 1hour or 500MB --> <Policies> - <SizeBasedTriggeringPolicy size="512MB"/> - <TimeBasedTriggeringPolicy interval="1" modulate="true"/> + <SizeBasedTriggeringPolicy size="512MB" /> + <TimeBasedTriggeringPolicy interval="1" modulate="true" /> </Policies> <!-- Keep 2 files per hour & auto delete [after 60 days] or [over 5GB or 500 files] --> <DefaultRolloverStrategy max="16"> <Delete basePath="${LOG_PATH}" maxDepth="2"> - <IfFileName glob="*/*.gz"/> - <IfLastModified age="60d"/> + <IfFileName glob="*/*.gz" /> + <IfLastModified age="60d" /> <IfAny> - <IfAccumulatedFileSize exceeds="5GB"/> - <IfAccumulatedFileCount exceeds="500"/> + <IfAccumulatedFileSize exceeds="5GB" /> + <IfAccumulatedFileCount exceeds="500" /> </IfAny> </Delete> </DefaultRolloverStrategy> @@ -115,25 +115,25 @@ <loggers> <root level="INFO"> - <appender-ref ref="file"/> - <appender-ref ref="console"/> + <appender-ref ref="file" /> + <appender-ref ref="console" /> </root> <logger name="com.alipay.sofa" level="INFO" additivity="false"> - <appender-ref ref="raft_file"/> - <appender-ref ref="console"/> + <appender-ref ref="raft_file" /> + <appender-ref ref="console" /> </logger> <logger name="io.netty" level="INFO" additivity="false"> - <appender-ref ref="file"/> - <appender-ref ref="console"/> + <appender-ref ref="file" /> + <appender-ref ref="console" /> </logger> <logger name="org.apache.commons" level="INFO" additivity="false"> - <appender-ref ref="file"/> - <appender-ref ref="console"/> + <appender-ref ref="file" /> + <appender-ref ref="console" /> </logger> <!-- Use mixed async way to output logs --> <AsyncLogger name="org.apache.hugegraph" level="INFO" additivity="false"> - <appender-ref ref="file"/> - <appender-ref ref="console"/> + <appender-ref ref="file" /> + <appender-ref ref="console" /> </AsyncLogger> </loggers> </configuration>
