This is an automated email from the ASF dual-hosted git repository. jin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit b5d9dd2f0201847a771fe886611af3ace1729488 Author: VGalaxies <[email protected]> AuthorDate: Thu Apr 4 00:00:09 2024 +0800 feat(pd): integrate `pd-common` submodule --- hugegraph-pd/hg-pd-common/pom.xml | 54 +++ .../org/apache/hugegraph/pd/common/GraphCache.java | 62 +++ .../org/apache/hugegraph/pd/common/HgAssert.java | 117 ++++++ .../org/apache/hugegraph/pd/common/KVPair.java | 132 ++++++ .../apache/hugegraph/pd/common/PDException.java | 47 +++ .../hugegraph/pd/common/PDRuntimeException.java | 49 +++ .../apache/hugegraph/pd/common/PartitionCache.java | 458 +++++++++++++++++++++ .../apache/hugegraph/pd/common/PartitionUtils.java | 47 +++ 8 files changed, 966 insertions(+) diff --git a/hugegraph-pd/hg-pd-common/pom.xml b/hugegraph-pd/hg-pd-common/pom.xml new file mode 100644 index 000000000..918c8deab --- /dev/null +++ b/hugegraph-pd/hg-pd-common/pom.xml @@ -0,0 +1,54 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.hugegraph</groupId> + <artifactId>hugegraph-pd</artifactId> + <version>${revision}</version> + <relativePath>../pom.xml</relativePath> + </parent> + <artifactId>hg-pd-common</artifactId> + + <properties> + <maven.compiler.source>11</maven.compiler.source> + <maven.compiler.target>11</maven.compiler.target> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.hugegraph</groupId> + <artifactId>hg-pd-grpc</artifactId> + <version>${revision}</version> + </dependency> + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + <version>1.18.24</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-collections4</artifactId> + <version>4.4</version> + </dependency> + </dependencies> +</project> diff --git a/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/GraphCache.java b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/GraphCache.java new file mode 100644 index 000000000..07c7c332d --- /dev/null +++ b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/GraphCache.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.pd.common; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hugegraph.pd.grpc.Metapb.Graph; +import org.apache.hugegraph.pd.grpc.Metapb.Partition; + +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; + +import lombok.Data; + +@Data +public class GraphCache { + + private Graph graph; + private AtomicBoolean initialized = new AtomicBoolean(false); + private AtomicBoolean writing = new AtomicBoolean(false); + private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private Map<Integer, AtomicBoolean> state = new ConcurrentHashMap<>(); + private Map<Integer, Partition> partitions = new ConcurrentHashMap<>(); + private RangeMap<Long, Integer> range = TreeRangeMap.create(); + + public GraphCache(Graph graph) { + this.graph = graph; + } + + public GraphCache() { + } + + public Partition getPartition(Integer id) { + return partitions.get(id); + } + + public Partition addPartition(Integer id, Partition p) { + return partitions.put(id, p); + } + + public Partition removePartition(Integer id) { + return partitions.remove(id); + } +} diff --git a/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/HgAssert.java b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/HgAssert.java new file mode 100644 index 000000000..710f96f28 --- /dev/null +++ b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/HgAssert.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.pd.common; + +import java.util.Collection; +import java.util.Map; + +public final class HgAssert { + + public static void isTrue(boolean expression, String message) { + if (message == null) { + throw new IllegalArgumentException("message is null"); + } + + if (!expression) { + throw new IllegalArgumentException(message); + } + } + + public static void isFalse(boolean expression, String message) { + isTrue(!expression, message); + } + + public static void isArgumentValid(byte[] bytes, String parameter) { + isFalse(isInvalid(bytes), "The argument is invalid: " + parameter); + } + + public static void isArgumentValid(String str, String parameter) { + isFalse(isInvalid(str), "The argument is invalid: " + parameter); + } + + public static void isArgumentNotNull(Object obj, String parameter) { + isTrue(obj != null, "The argument is null: " + parameter); + } + + public static void istValid(byte[] bytes, String msg) { + isFalse(isInvalid(bytes), msg); + } + + public static void isValid(String str, String msg) { + isFalse(isInvalid(str), msg); + } + + public static void isNotNull(Object obj, String msg) { + isTrue(obj != null, msg); + } + + public static boolean isContains(Object[] objs, Object obj) { + if (objs == null || objs.length == 0 || obj == null) { + return false; + } + for (Object item : objs) { + if (obj.equals(item)) { + return true; + } + } + return false; + } + + public static boolean isInvalid(String... strs) { + if (strs == null || strs.length == 0) { + return true; + } + for (String item : strs) { + if (item == null || "".equals(item.trim())) { + return true; + } + } + return false; + } + + public static boolean isInvalid(byte[] bytes) { + return bytes == null || bytes.length == 0; + } + + public static boolean isInvalid(Map<?, ?> map) { + return map == null || map.isEmpty(); + } + + public static boolean isInvalid(Collection<?> list) { + return list == null || list.isEmpty(); + } + + public static <T> boolean isContains(Collection<T> list, T item) { + if (list == null || item == null) { + return false; + } + return list.contains(item); + } + + public static boolean isNull(Object... objs) { + if (objs == null) { + return true; + } + for (Object item : objs) { + if (item == null) { + return true; + } + } + return false; + } +} diff --git a/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/KVPair.java b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/KVPair.java new file mode 100644 index 000000000..b5e916c48 --- /dev/null +++ b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/KVPair.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.pd.common; + +import java.io.Serializable; +import java.util.Objects; + +public class KVPair<K, V> implements Serializable { + + /** + * Key of this <code>Pair</code>. + */ + private K key; + /** + * Value of this this <code>Pair</code>. + */ + private V value; + + /** + * Creates a new pair + * + * @param key The key for this pair + * @param value The value to use for this pair + */ + public KVPair(K key, V value) { + this.key = key; + this.value = value; + } + + /** + * Gets the key for this pair. + * + * @return key for this pair + */ + public K getKey() { + return key; + } + + public void setKey(K key) { + this.key = key; + } + + /** + * Gets the value for this pair. + * + * @return value for this pair + */ + public V getValue() { + return value; + } + + public void setValue(V value) { + this.value = value; + } + + /** + * <p><code>String</code> representation of this + * <code>Pair</code>.</p> + * + * <p>The default name/value delimiter '=' is always used.</p> + * + * @return <code>String</code> representation of this <code>Pair</code> + */ + @Override + public String toString() { + return key + "=" + value; + } + + /** + * <p>Generate a hash code for this <code>Pair</code>.</p> + * + * <p>The hash code is calculated using both the name and + * the value of the <code>Pair</code>.</p> + * + * @return hash code for this <code>Pair</code> + */ + @Override + public int hashCode() { + // name's hashCode is multiplied by an arbitrary prime number (13) + // in order to make sure there is a difference in the hashCode between + // these two parameters: + // name: a value: aa + // name: aa value: a + return key.hashCode() * 13 + (value == null ? 0 : value.hashCode()); + } + + /** + * <p>Test this <code>Pair</code> for equality with another + * <code>Object</code>.</p> + * + * <p>If the <code>Object</code> to be tested is not a + * <code>Pair</code> or is <code>null</code>, then this method + * returns <code>false</code>.</p> + * + * <p>Two <code>Pair</code>s are considered equal if and only if + * both the names and values are equal.</p> + * + * @param o the <code>Object</code> to test for + * equality with this <code>Pair</code> + * @return <code>true</code> if the given <code>Object</code> is + * equal to this <code>Pair</code> else <code>false</code> + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o instanceof KVPair) { + KVPair pair = (KVPair) o; + if (!Objects.equals(key, pair.key)) { + return false; + } + return Objects.equals(value, pair.value); + } + return false; + } +} diff --git a/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PDException.java b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PDException.java new file mode 100644 index 000000000..b398137e8 --- /dev/null +++ b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PDException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.pd.common; + +public class PDException extends Exception { + + private final int errorCode; + + public PDException(int error) { + super(String.format("Error code = %d", error)); + this.errorCode = error; + } + + public PDException(int error, String msg) { + super(msg); + this.errorCode = error; + } + + public PDException(int error, Throwable e) { + super(e); + this.errorCode = error; + } + + public PDException(int error, String msg, Throwable e) { + super(msg, e); + this.errorCode = error; + } + + public int getErrorCode() { + return errorCode; + } +} diff --git a/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PDRuntimeException.java b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PDRuntimeException.java new file mode 100644 index 000000000..0bd90241d --- /dev/null +++ b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PDRuntimeException.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.pd.common; + +public class PDRuntimeException extends RuntimeException { + + // public static final int LICENSE_ERROR = -11; + + private int errorCode = 0; + + public PDRuntimeException(int error) { + super(String.format("Error code = %d", error)); + this.errorCode = error; + } + + public PDRuntimeException(int error, String msg) { + super(msg); + this.errorCode = error; + } + + public PDRuntimeException(int error, Throwable e) { + super(e); + this.errorCode = error; + } + + public PDRuntimeException(int error, String msg, Throwable e) { + super(msg, e); + this.errorCode = error; + } + + public int getErrorCode() { + return errorCode; + } +} diff --git a/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PartitionCache.java b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PartitionCache.java new file mode 100644 index 000000000..9bd233fd2 --- /dev/null +++ b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PartitionCache.java @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.pd.common; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.hugegraph.pd.grpc.Metapb; + +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; + +/** + * 放弃 copy on write 的方式 + * 1. 在 graph * partition 数量极多的时候,效率严重下降,不能用 + */ +public class PartitionCache { + + // 读写锁对象 + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Map<String, AtomicBoolean> locks = new HashMap<>(); + Lock writeLock = readWriteLock.writeLock(); + // 每张图一个缓存 + private volatile Map<String, RangeMap<Long, Integer>> keyToPartIdCache; + // graphName + PartitionID 组成 key + private volatile Map<String, Map<Integer, Metapb.Partition>> partitionCache; + private volatile Map<Integer, Metapb.ShardGroup> shardGroupCache; + private volatile Map<Long, Metapb.Store> storeCache; + private volatile Map<String, Metapb.Graph> graphCache; + + public PartitionCache() { + keyToPartIdCache = new HashMap<>(); + partitionCache = new HashMap<>(); + shardGroupCache = new ConcurrentHashMap<>(); + storeCache = new ConcurrentHashMap<>(); + graphCache = new ConcurrentHashMap<>(); + } + + private AtomicBoolean getOrCreateGraphLock(String graphName) { + var lock = this.locks.get(graphName); + if (lock == null) { + try { + writeLock.lock(); + if ((lock = this.locks.get(graphName)) == null) { + lock = new AtomicBoolean(); + locks.put(graphName, lock); + } + } finally { + writeLock.unlock(); + } + } + return lock; + } + + public void waitGraphLock(String graphName) { + var lock = getOrCreateGraphLock(graphName); + while (lock.get()) { + Thread.onSpinWait(); + } + } + + public void lockGraph(String graphName) { + var lock = getOrCreateGraphLock(graphName); + while (lock.compareAndSet(false, true)) { + Thread.onSpinWait(); + } + } + + public void unlockGraph(String graphName) { + var lock = getOrCreateGraphLock(graphName); + lock.set(false); + } + + /** + * 根据 partitionId 返回分区信息 + * + * @param graphName + * @param partId + * @return + */ + public KVPair<Metapb.Partition, Metapb.Shard> getPartitionById(String graphName, int partId) { + waitGraphLock(graphName); + var graphs = partitionCache.get(graphName); + if (graphs != null) { + var partition = graphs.get(partId); + if (partition != null) { + return new KVPair<>(partition, getLeaderShard(partId)); + } + } + + return null; + } + + /** + * 返回 key 所在的分区信息 + * + * @param key + * @return + */ + public KVPair<Metapb.Partition, Metapb.Shard> getPartitionByKey(String graphName, byte[] key) { + int code = PartitionUtils.calcHashcode(key); + return getPartitionByCode(graphName, code); + } + + /** + * 根据 key 的 hashcode 返回分区信息 + * + * @param graphName + * @param code + * @return + */ + public KVPair<Metapb.Partition, Metapb.Shard> getPartitionByCode(String graphName, long code) { + waitGraphLock(graphName); + RangeMap<Long, Integer> rangeMap = keyToPartIdCache.get(graphName); + if (rangeMap != null) { + Integer partId = rangeMap.get(code); + if (partId != null) { + return getPartitionById(graphName, partId); + } + } + return null; + } + + public List<Metapb.Partition> getPartitions(String graphName) { + waitGraphLock(graphName); + + List<Metapb.Partition> partitions = new ArrayList<>(); + if (!partitionCache.containsKey(graphName)) { + return partitions; + } + partitionCache.get(graphName).forEach((k, v) -> { + partitions.add(v); + }); + + return partitions; + } + + public boolean addPartition(String graphName, int partId, Metapb.Partition partition) { + waitGraphLock(graphName); + Metapb.Partition old = null; + + if (partitionCache.containsKey(graphName)) { + old = partitionCache.get(graphName).get(partId); + } + + if (old != null && old.equals(partition)) { + return false; + } + try { + + lockGraph(graphName); + + partitionCache.computeIfAbsent(graphName, k -> new HashMap<>()).put(partId, partition); + + if (old != null) { + // old [1-3) 被 [2-3) 覆盖了。当 [1-3) 变成 [1-2) 不应该删除原先的 [1-3) + // 当确认老的 start, end 都是自己的时候,才可以删除老的。(即还没覆盖) + var graphRange = keyToPartIdCache.get(graphName); + if (Objects.equals(partition.getId(), graphRange.get(partition.getStartKey())) && + Objects.equals(partition.getId(), graphRange.get(partition.getEndKey() - 1))) { + graphRange.remove(graphRange.getEntry(partition.getStartKey()).getKey()); + } + } + + keyToPartIdCache.computeIfAbsent(graphName, k -> TreeRangeMap.create()) + .put(Range.closedOpen(partition.getStartKey(), + partition.getEndKey()), partId); + } finally { + unlockGraph(graphName); + } + return true; + } + + public void updatePartition(String graphName, int partId, Metapb.Partition partition) { + try { + lockGraph(graphName); + Metapb.Partition old = null; + var graphs = partitionCache.get(graphName); + if (graphs != null) { + old = graphs.get(partId); + } + + if (old != null) { + var graphRange = keyToPartIdCache.get(graphName); + if (Objects.equals(partition.getId(), graphRange.get(partition.getStartKey())) && + Objects.equals(partition.getId(), graphRange.get(partition.getEndKey() - 1))) { + graphRange.remove(graphRange.getEntry(partition.getStartKey()).getKey()); + } + } + + partitionCache.computeIfAbsent(graphName, k -> new HashMap<>()).put(partId, partition); + keyToPartIdCache.computeIfAbsent(graphName, k -> TreeRangeMap.create()) + .put(Range.closedOpen(partition.getStartKey(), partition.getEndKey()), + partId); + } finally { + unlockGraph(graphName); + } + } + + public boolean updatePartition(Metapb.Partition partition) { + + var graphName = partition.getGraphName(); + var partitionId = partition.getId(); + + var old = getPartitionById(graphName, partitionId); + if (old != null && Objects.equals(partition, old.getKey())) { + return false; + } + + updatePartition(graphName, partitionId, partition); + return true; + } + + public void removePartition(String graphName, int partId) { + try { + lockGraph(graphName); + var partition = partitionCache.get(graphName).remove(partId); + if (partition != null) { + var graphRange = keyToPartIdCache.get(graphName); + + if (Objects.equals(partition.getId(), graphRange.get(partition.getStartKey())) && + Objects.equals(partition.getId(), graphRange.get(partition.getEndKey() - 1))) { + graphRange.remove(graphRange.getEntry(partition.getStartKey()).getKey()); + } + } + } finally { + unlockGraph(graphName); + } + } + + /** + * remove partition id of graph name + * + * @param graphName + * @param id + */ + public void remove(String graphName, int id) { + removePartition(graphName, id); + } + + /** + * remove all partitions + */ + public void removePartitions() { + writeLock.lock(); + try { + partitionCache = new HashMap<>(); + keyToPartIdCache = new HashMap<>(); + locks.clear(); + } finally { + writeLock.unlock(); + } + } + + /** + * remove partition cache of graphName + * + * @param graphName + */ + public void removeAll(String graphName) { + try { + lockGraph(graphName); + partitionCache.remove(graphName); + keyToPartIdCache.remove(graphName); + locks.remove(graphName); + } finally { + unlockGraph(graphName); + } + } + + private String makePartitionKey(String graphName, int partId) { + return graphName + "/" + partId; + } + + public boolean updateShardGroup(Metapb.ShardGroup shardGroup) { + Metapb.ShardGroup oldShardGroup = shardGroupCache.get(shardGroup.getId()); + if (oldShardGroup != null && oldShardGroup.equals(shardGroup)) { + return false; + } + shardGroupCache.put(shardGroup.getId(), shardGroup); + return true; + } + + public void deleteShardGroup(int shardGroupId) { + shardGroupCache.remove(shardGroupId); + } + + public Metapb.ShardGroup getShardGroup(int groupId) { + return shardGroupCache.get(groupId); + } + + public boolean addStore(Long storeId, Metapb.Store store) { + Metapb.Store oldStore = storeCache.get(storeId); + if (oldStore != null && oldStore.equals(store)) { + return false; + } + storeCache.put(storeId, store); + return true; + } + + public Metapb.Store getStoreById(Long storeId) { + return storeCache.get(storeId); + } + + public void removeStore(Long storeId) { + storeCache.remove(storeId); + } + + public boolean hasGraph(String graphName) { + return getPartitions(graphName).size() > 0; + } + + public void updateGraph(Metapb.Graph graph) { + if (Objects.equals(graph, getGraph(graph.getGraphName()))) { + return; + } + graphCache.put(graph.getGraphName(), graph); + } + + public Metapb.Graph getGraph(String graphName) { + return graphCache.get(graphName); + } + + public List<Metapb.Graph> getGraphs() { + List<Metapb.Graph> graphs = new ArrayList<>(); + graphCache.forEach((k, v) -> { + graphs.add(v); + }); + return graphs; + } + + public void reset() { + writeLock.lock(); + try { + partitionCache = new HashMap<>(); + keyToPartIdCache = new HashMap<>(); + shardGroupCache = new ConcurrentHashMap<>(); + storeCache = new ConcurrentHashMap<>(); + graphCache = new ConcurrentHashMap<>(); + locks.clear(); + } finally { + writeLock.unlock(); + } + } + + public void clear() { + reset(); + } + + public String debugCacheByGraphName(String graphName) { + StringBuilder builder = new StringBuilder(); + builder.append("Graph:").append(graphName).append(", cache info: range info: {"); + var rangeMap = keyToPartIdCache.get(graphName); + builder.append(rangeMap == null ? "" : rangeMap).append("}"); + + if (rangeMap != null) { + builder.append(", partition info : {"); + rangeMap.asMapOfRanges().forEach((k, v) -> { + var partition = partitionCache.get(graphName).get(v); + builder.append("[part_id:").append(v); + if (partition != null) { + builder.append(", start_key:").append(partition.getStartKey()) + .append(", end_key:").append(partition.getEndKey()) + .append(", state:").append(partition.getState().name()); + } + builder.append("], "); + }); + builder.append("}"); + } + + builder.append(", graph info:{"); + var graph = graphCache.get(graphName); + if (graph != null) { + builder.append("partition_count:").append(graph.getPartitionCount()) + .append(", state:").append(graph.getState().name()); + } + builder.append("}]"); + return builder.toString(); + } + + public Metapb.Shard getLeaderShard(int partitionId) { + var shardGroup = shardGroupCache.get(partitionId); + if (shardGroup != null) { + for (Metapb.Shard shard : shardGroup.getShardsList()) { + if (shard.getRole() == Metapb.ShardRole.Leader) { + return shard; + } + } + } + + return null; + } + + public void updateShardGroupLeader(int partitionId, Metapb.Shard leader) { + if (shardGroupCache.containsKey(partitionId) && leader != null) { + if (!Objects.equals(getLeaderShard(partitionId), leader)) { + var shardGroup = shardGroupCache.get(partitionId); + var builder = Metapb.ShardGroup.newBuilder(shardGroup).clearShards(); + for (var shard : shardGroup.getShardsList()) { + builder.addShards( + Metapb.Shard.newBuilder() + .setStoreId(shard.getStoreId()) + .setRole(shard.getStoreId() == leader.getStoreId() ? + Metapb.ShardRole.Leader : + Metapb.ShardRole.Follower) + .build() + ); + } + shardGroupCache.put(partitionId, builder.build()); + } + } + } + + public String debugShardGroup() { + StringBuilder builder = new StringBuilder(); + builder.append("shard group cache:{"); + shardGroupCache.forEach((partitionId, shardGroup) -> { + builder.append(partitionId).append("::{") + .append("version:").append(shardGroup.getVersion()) + .append(", conf_version:").append(shardGroup.getConfVer()) + .append(", state:").append(shardGroup.getState().name()) + .append(", shards:["); + + for (var shard : shardGroup.getShardsList()) { + builder.append("{store_id:").append(shard.getStoreId()) + .append(", role:").append(shard.getRole().name()) + .append("},"); + } + builder.append("], "); + }); + builder.append("}"); + return builder.toString(); + } +} diff --git a/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PartitionUtils.java b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PartitionUtils.java new file mode 100644 index 000000000..0e35cc555 --- /dev/null +++ b/hugegraph-pd/hg-pd-common/src/main/java/org/apache/hugegraph/pd/common/PartitionUtils.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.pd.common; + +public class PartitionUtils { + + public static final int MAX_VALUE = 0xffff; + + /** + * 计算key的hashcode + * + * @param key + * @return hashcode + */ + public static int calcHashcode(byte[] key) { + final int p = 16777619; + int hash = (int) 2166136261L; + for (byte element : key) { + hash = (hash ^ element) * p; + } + hash += hash << 13; + hash ^= hash >> 7; + hash += hash << 3; + hash ^= hash >> 17; + hash += hash << 5; + hash = hash & PartitionUtils.MAX_VALUE; + if (hash == PartitionUtils.MAX_VALUE) { + hash = PartitionUtils.MAX_VALUE - 1; + } + return hash; + } +}
