This is an automated email from the ASF dual-hosted git repository. duanzhengqiang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/shardingsphere-plugin.git
commit 65661b8a1b853428e255473423668a3b55e25f2b Author: duanzhengqiang <[email protected]> AuthorDate: Wed Sep 27 15:02:20 2023 +0800 Move rc4, like, nacos and apollo spi implementation to shardingsphere plugin --- features/encrypt/like/pom.xml | 42 +++ .../algorithm/CharDigestLikeEncryptAlgorithm.java | 163 +++++++++ ...che.shardingsphere.encrypt.spi.EncryptAlgorithm | 18 + .../algorithm/like/common_chinese_character.dict | 18 + .../CharDigestLikeEncryptAlgorithmTest.java | 73 ++++ features/encrypt/rc4/pom.xml | 42 +++ .../encrypt/rc4/algorithm/RC4EncryptAlgorithm.java | 110 ++++++ ...che.shardingsphere.encrypt.spi.EncryptAlgorithm | 18 + .../rc4/algorithm/RC4EncryptAlgorithmTest.java | 82 +++++ jdbc/driver/pom.xml | 52 +++ .../jdbc/driver/ApolloURLProvider.java | 49 +++ ...iver.jdbc.core.driver.ShardingSphereURLProvider | 18 + jdbc/pom.xml | 33 ++ mode/cluster/pom.xml | 33 ++ mode/cluster/repository/nacos/pom.xml | 53 +++ .../repository/cluster/nacos/NacosRepository.java | 363 ++++++++++++++++++++ .../repository/cluster/nacos/entity/KeyValue.java | 35 ++ .../cluster/nacos/entity/ServiceController.java | 63 ++++ .../cluster/nacos/entity/ServiceMetaData.java | 56 +++ .../nacos/listener/NamingEventListener.java | 132 ++++++++ .../cluster/nacos/listener/WatchData.java | 39 +++ .../cluster/nacos/props/NacosProperties.java | 32 ++ .../cluster/nacos/props/NacosPropertyKey.java | 66 ++++ .../cluster/nacos/util/NacosMetaDataUtils.java | 82 +++++ ...ode.repository.cluster.ClusterPersistRepository | 18 + .../cluster/nacos/NacosRepositoryTest.java | 377 +++++++++++++++++++++ .../cluster/nacos/props/NacosPropertiesTest.java | 62 ++++ mode/cluster/repository/pom.xml | 33 ++ mode/pom.xml | 33 ++ pom.xml | 10 + 30 files changed, 2205 insertions(+) diff --git a/features/encrypt/like/pom.xml b/features/encrypt/like/pom.xml new file mode 100644 index 0000000..f461332 --- /dev/null +++ b/features/encrypt/like/pom.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.shardingsphere</groupId> + <artifactId>shardingsphere-plugin-feature-encrypt</artifactId> + <version>5.4.1-SNAPSHOT</version> + </parent> + <artifactId>shardingsphere-plugin-feature-encrypt-like</artifactId> + <name>${project.artifactId}</name> + + <dependencies> + <dependency> + <groupId>org.apache.shardingsphere</groupId> + <artifactId>shardingsphere-encrypt-api</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.shardingsphere</groupId> + <artifactId>shardingsphere-encrypt-core</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> +</project> diff --git a/features/encrypt/like/src/main/java/org/apache/shardingsphere/encrypt/like/algorithm/CharDigestLikeEncryptAlgorithm.java b/features/encrypt/like/src/main/java/org/apache/shardingsphere/encrypt/like/algorithm/CharDigestLikeEncryptAlgorithm.java new file mode 100644 index 0000000..b909e53 --- /dev/null +++ b/features/encrypt/like/src/main/java/org/apache/shardingsphere/encrypt/like/algorithm/CharDigestLikeEncryptAlgorithm.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.encrypt.like.algorithm; + +import com.google.common.base.Strings; +import lombok.EqualsAndHashCode; +import lombok.SneakyThrows; +import org.apache.shardingsphere.encrypt.api.context.EncryptContext; +import org.apache.shardingsphere.encrypt.api.encrypt.like.LikeEncryptAlgorithm; +import org.apache.shardingsphere.encrypt.exception.algorithm.EncryptAlgorithmInitializationException; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Scanner; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Char digest like encrypt algorithm. + */ +@EqualsAndHashCode +public final class CharDigestLikeEncryptAlgorithm implements LikeEncryptAlgorithm { + + private static final String DELTA_KEY = "delta"; + + private static final String MASK_KEY = "mask"; + + private static final String START_KEY = "start"; + + private static final String DICT_KEY = "dict"; + + private static final int DEFAULT_DELTA = 1; + + private static final int DEFAULT_MASK = 0b1111_0111_1101; + + private static final int DEFAULT_START = 0x4e00; + + private static final int MAX_NUMERIC_LETTER_CHAR = 255; + + private int delta; + + private int mask; + + private int start; + + private Map<Character, Integer> charIndexes; + + @Override + public void init(final Properties props) { + delta = createDelta(props); + mask = createMask(props); + start = createStart(props); + charIndexes = createCharIndexes(props); + } + + private int createDelta(final Properties props) { + if (props.containsKey(DELTA_KEY)) { + try { + return Integer.parseInt(props.getProperty(DELTA_KEY)); + } catch (final NumberFormatException ignored) { + throw new EncryptAlgorithmInitializationException(getType(), "delta can only be a decimal number"); + } + } + return DEFAULT_DELTA; + } + + private int createMask(final Properties props) { + if (props.containsKey(MASK_KEY)) { + try { + return Integer.parseInt(props.getProperty(MASK_KEY)); + } catch (final NumberFormatException ignored) { + throw new EncryptAlgorithmInitializationException(getType(), "mask can only be a decimal number"); + } + } + return DEFAULT_MASK; + } + + private int createStart(final Properties props) { + if (props.containsKey(START_KEY)) { + try { + return Integer.parseInt(props.getProperty(START_KEY)); + } catch (final NumberFormatException ignored) { + throw new EncryptAlgorithmInitializationException(getType(), "start can only be a decimal number"); + } + } + return DEFAULT_START; + } + + private Map<Character, Integer> createCharIndexes(final Properties props) { + String dictContent = props.containsKey(DICT_KEY) && !Strings.isNullOrEmpty(props.getProperty(DICT_KEY)) ? props.getProperty(DICT_KEY) : initDefaultDict(); + return IntStream.range(0, dictContent.length()).boxed().collect(Collectors.toMap(dictContent::charAt, index -> index, (oldValue, currentValue) -> oldValue)); + } + + @SneakyThrows(IOException.class) + private String initDefaultDict() { + StringBuilder result = new StringBuilder(); + try ( + InputStream inputStream = Objects.requireNonNull(Thread.currentThread().getContextClassLoader().getResourceAsStream("algorithm/like/common_chinese_character.dict")); + Scanner scanner = new Scanner(inputStream)) { + while (scanner.hasNextLine()) { + String line = scanner.nextLine(); + if (!line.isEmpty() && !line.startsWith("#")) { + result.append(line); + } + } + } + return result.toString(); + } + + @Override + public String encrypt(final Object plainValue, final EncryptContext encryptContext) { + return null == plainValue ? null : digest(String.valueOf(plainValue)); + } + + private String digest(final String plainValue) { + StringBuilder result = new StringBuilder(plainValue.length()); + for (char each : plainValue.toCharArray()) { + char maskedChar = getMaskedChar(each); + if ('%' == maskedChar || '_' == maskedChar) { + result.append(each); + } else { + result.append(maskedChar); + } + } + return result.toString(); + } + + private char getMaskedChar(final char originalChar) { + if ('%' == originalChar || '_' == originalChar) { + return originalChar; + } + if (originalChar <= MAX_NUMERIC_LETTER_CHAR) { + return (char) ((originalChar + delta) & mask); + } + if (charIndexes.containsKey(originalChar)) { + return (char) (((charIndexes.get(originalChar) + delta) & mask) + start); + } + return (char) (((originalChar + delta) & mask) + start); + } + + @Override + public String getType() { + return "CHAR_DIGEST_LIKE"; + } +} diff --git a/features/encrypt/like/src/main/resources/META-INF/services/org.apache.shardingsphere.encrypt.spi.EncryptAlgorithm b/features/encrypt/like/src/main/resources/META-INF/services/org.apache.shardingsphere.encrypt.spi.EncryptAlgorithm new file mode 100644 index 0000000..2e4fdf0 --- /dev/null +++ b/features/encrypt/like/src/main/resources/META-INF/services/org.apache.shardingsphere.encrypt.spi.EncryptAlgorithm @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.shardingsphere.encrypt.like.algorithm.CharDigestLikeEncryptAlgorithm diff --git a/features/encrypt/like/src/main/resources/algorithm/like/common_chinese_character.dict b/features/encrypt/like/src/main/resources/algorithm/like/common_chinese_character.dict new file mode 100644 index 0000000..83c5ea5 --- /dev/null +++ b/features/encrypt/like/src/main/resources/algorithm/like/common_chinese_character.dict @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +谤杉巫夏辅俯鸵直菱梨滨头矾讯芯巡泥簇何逊谜颐男拴冰响贰哈雄赌密愚思戊叔苔肩亏硕递意忻雷扫旋谭坎散拷廖余饮囤咖小娄藻唇妙枣豺料淡器谈赘托察湿莽算诽嘛越魁涤缮漱镣豁孔萝脾稻杠遮漏浪常滦呸层涪粪乐萄褐榜程伙椭句蚤入概鹰惨闻舶封瘴创详筹边络乞洪咯丈眯弛娱狸锐俄痔向片壬二剧壳羌劳澳到箔慧醛俏硬读宁谨谋捶牧豆脱饰肛渠坞富揉赤帽能巩雁题姬崔旁袄主蝇屏撼粗贡刃办肉穷态柳嚼鸥哨兔瓦籍谊唯指混丰肤值邹脸爪摄兑券浆薄漾盟磊牺筑锤匿碑萍拌醉焊扞韭群擦飞尺从咆蜘辗帅蘑懦彭翠痛联热惮溯啦乏沏颈渔奋曲衣焦迄喳狞登冯制腐青围业瑶拨碉赢幽开狠耐熄培朝编堂嫌媚王跳剖骸四爬氛既遣鱼氢利龟爱斯毅屠氦止邀厉员拭佛针牲菌耳罕诺虐倦瓣渐丛哼麻饱且楷诀港贤酣李管撅茧犊关聪凉试弦煤谐掣戈膊藩翘扬契货鸿逮众� ��汾芍凋撬罩数反罢背睛琵觉芒箩嫂钡豹峭箱纤拄哑留损售布旅搁蒲藕阶郝披恃邢落殉证盗鞠竟亢勺刀足畦熙帛曹娇宗亭蔑靖须医疮凯灾鸡蔫室耪沃阂伎墨卷随釜巾浴领臣啊绊纫较后圃桨厚教磋颖姥你荤丧沪芳栋休未茁舵林啥国糖规括搪扯补摹近绸履瞄兄般轧坛够雀烂锨嘱聚陵篷调躬印祈挟胶饯雪甸渤当裕汝鲸秽咬岸解剂丁勉鞋缺雨觅伶顿镁掠鸦埔郡场欧商推硒 [...] diff --git a/features/encrypt/like/src/test/java/org/apache/shardingsphere/encrypt/like/algorithm/CharDigestLikeEncryptAlgorithmTest.java b/features/encrypt/like/src/test/java/org/apache/shardingsphere/encrypt/like/algorithm/CharDigestLikeEncryptAlgorithmTest.java new file mode 100644 index 0000000..72adce4 --- /dev/null +++ b/features/encrypt/like/src/test/java/org/apache/shardingsphere/encrypt/like/algorithm/CharDigestLikeEncryptAlgorithmTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.encrypt.like.algorithm; + +import org.apache.shardingsphere.encrypt.api.context.EncryptContext; +import org.apache.shardingsphere.encrypt.api.encrypt.like.LikeEncryptAlgorithm; +import org.apache.shardingsphere.encrypt.spi.EncryptAlgorithm; +import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Properties; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.mock; + +class CharDigestLikeEncryptAlgorithmTest { + + private LikeEncryptAlgorithm englishLikeEncryptAlgorithm; + + private LikeEncryptAlgorithm chineseLikeEncryptAlgorithm; + + private LikeEncryptAlgorithm koreanLikeEncryptAlgorithm; + + @BeforeEach + void setUp() { + englishLikeEncryptAlgorithm = (LikeEncryptAlgorithm) TypedSPILoader.getService(EncryptAlgorithm.class, "CHAR_DIGEST_LIKE"); + chineseLikeEncryptAlgorithm = (LikeEncryptAlgorithm) TypedSPILoader.getService(EncryptAlgorithm.class, "CHAR_DIGEST_LIKE"); + Properties props = new Properties(); + props.put("dict", "한국어시험"); + props.put("start", "44032"); + koreanLikeEncryptAlgorithm = (LikeEncryptAlgorithm) TypedSPILoader.getService(EncryptAlgorithm.class, "CHAR_DIGEST_LIKE", props); + } + + @Test + void assertEncrypt() { + assertThat(englishLikeEncryptAlgorithm.encrypt("1234567890%abcdefghijklmnopqrstuvwxyz%ABCDEFGHIJKLMNOPQRSTUVWXYZ", + mock(EncryptContext.class)), is("0145458981%`adedehihilmlmpqpqtutuxyxy%@ADEDEHIHILMLMPQPQTUTUXYXY")); + assertThat(englishLikeEncryptAlgorithm.encrypt("_1234__5678__", mock(EncryptContext.class)), is("_0145__4589__")); + } + + @Test + void assertEncryptWithChineseChar() { + assertThat(chineseLikeEncryptAlgorithm.encrypt("中国", mock(EncryptContext.class)), is("婝估")); + } + + @Test + void assertEncryptWithKoreanChar() { + assertThat(koreanLikeEncryptAlgorithm.encrypt("한국", mock(EncryptContext.class)), is("각가")); + } + + @Test + void assertEncryptWithNullPlaintext() { + assertNull(englishLikeEncryptAlgorithm.encrypt(null, mock(EncryptContext.class))); + } +} diff --git a/features/encrypt/rc4/pom.xml b/features/encrypt/rc4/pom.xml new file mode 100644 index 0000000..60846cb --- /dev/null +++ b/features/encrypt/rc4/pom.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.shardingsphere</groupId> + <artifactId>shardingsphere-plugin-feature-encrypt</artifactId> + <version>5.4.1-SNAPSHOT</version> + </parent> + <artifactId>shardingsphere-plugin-feature-encrypt-rc4</artifactId> + <name>${project.artifactId}</name> + + <dependencies> + <dependency> + <groupId>org.apache.shardingsphere</groupId> + <artifactId>shardingsphere-encrypt-api</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.shardingsphere</groupId> + <artifactId>shardingsphere-encrypt-core</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> +</project> diff --git a/features/encrypt/rc4/src/main/java/org/apache/shardingsphere/encrypt/rc4/algorithm/RC4EncryptAlgorithm.java b/features/encrypt/rc4/src/main/java/org/apache/shardingsphere/encrypt/rc4/algorithm/RC4EncryptAlgorithm.java new file mode 100644 index 0000000..25679cd --- /dev/null +++ b/features/encrypt/rc4/src/main/java/org/apache/shardingsphere/encrypt/rc4/algorithm/RC4EncryptAlgorithm.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.encrypt.rc4.algorithm; + +import lombok.EqualsAndHashCode; +import org.apache.commons.codec.binary.Base64; +import org.apache.shardingsphere.encrypt.api.context.EncryptContext; +import org.apache.shardingsphere.encrypt.api.encrypt.standard.StandardEncryptAlgorithm; +import org.apache.shardingsphere.encrypt.exception.algorithm.EncryptAlgorithmInitializationException; +import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; + +import java.nio.charset.StandardCharsets; +import java.util.Properties; + +/** + * RC4 encrypt algorithm. + */ +@EqualsAndHashCode +public final class RC4EncryptAlgorithm implements StandardEncryptAlgorithm { + + private static final String RC4_KEY = "rc4-key-value"; + + private static final int KEY_MIN_LENGTH = 5; + + private static final int SBOX_LENGTH = 256; + + private byte[] key; + + @Override + public void init(final Properties props) { + key = getKey(props); + } + + private byte[] getKey(final Properties props) { + byte[] result = props.getProperty(RC4_KEY, "").getBytes(StandardCharsets.UTF_8); + ShardingSpherePreconditions.checkState(KEY_MIN_LENGTH <= result.length && SBOX_LENGTH > result.length, + () -> new EncryptAlgorithmInitializationException(getType(), "Key length has to be between " + KEY_MIN_LENGTH + " and " + (SBOX_LENGTH - 1))); + return result; + } + + @Override + public String encrypt(final Object plainValue, final EncryptContext encryptContext) { + return null == plainValue ? null : Base64.encodeBase64String(crypt(String.valueOf(plainValue).getBytes(StandardCharsets.UTF_8))); + } + + @Override + public Object decrypt(final Object cipherValue, final EncryptContext encryptContext) { + return null == cipherValue ? null : new String(crypt(Base64.decodeBase64(cipherValue.toString())), StandardCharsets.UTF_8); + } + + /* + * @see <a href="http://en.wikipedia.org/wiki/RC4#Pseudo-random_generation_algorithm_.28PRGA.29">Pseudo-random generation algorithm</a> + */ + private byte[] crypt(final byte[] message) { + int[] sBox = getSBox(); + byte[] result = new byte[message.length]; + int i = 0; + int j = 0; + for (int n = 0; n < message.length; n++) { + i = (i + 1) % SBOX_LENGTH; + j = (j + sBox[i]) % SBOX_LENGTH; + swap(i, j, sBox); + int rand = sBox[(sBox[i] + sBox[j]) % SBOX_LENGTH]; + result[n] = (byte) (rand ^ message[n]); + } + return result; + } + + /* + * @see <a href="http://en.wikipedia.org/wiki/RC4#Key-scheduling_algorithm_.28KSA.29">Wikipedia. Init sBox</a> + */ + private int[] getSBox() { + int[] result = new int[SBOX_LENGTH]; + int j = 0; + for (int i = 0; i < SBOX_LENGTH; i++) { + result[i] = i; + } + for (int i = 0; i < SBOX_LENGTH; i++) { + j = (j + result[i] + (key[i % key.length]) & 0xFF) % SBOX_LENGTH; + swap(i, j, result); + } + return result; + } + + private void swap(final int i, final int j, final int[] sBox) { + int temp = sBox[i]; + sBox[i] = sBox[j]; + sBox[j] = temp; + } + + @Override + public String getType() { + return "RC4"; + } +} diff --git a/features/encrypt/rc4/src/main/resources/META-INF/services/org.apache.shardingsphere.encrypt.spi.EncryptAlgorithm b/features/encrypt/rc4/src/main/resources/META-INF/services/org.apache.shardingsphere.encrypt.spi.EncryptAlgorithm new file mode 100644 index 0000000..44b6f0f --- /dev/null +++ b/features/encrypt/rc4/src/main/resources/META-INF/services/org.apache.shardingsphere.encrypt.spi.EncryptAlgorithm @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.shardingsphere.encrypt.rc4.algorithm.RC4EncryptAlgorithm diff --git a/features/encrypt/rc4/src/test/java/org/apache/shardingsphere/encrypt/rc4/algorithm/RC4EncryptAlgorithmTest.java b/features/encrypt/rc4/src/test/java/org/apache/shardingsphere/encrypt/rc4/algorithm/RC4EncryptAlgorithmTest.java new file mode 100644 index 0000000..b0523dc --- /dev/null +++ b/features/encrypt/rc4/src/test/java/org/apache/shardingsphere/encrypt/rc4/algorithm/RC4EncryptAlgorithmTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.encrypt.rc4.algorithm; + +import org.apache.shardingsphere.encrypt.api.context.EncryptContext; +import org.apache.shardingsphere.encrypt.api.encrypt.standard.StandardEncryptAlgorithm; +import org.apache.shardingsphere.encrypt.exception.algorithm.EncryptAlgorithmInitializationException; +import org.apache.shardingsphere.encrypt.spi.EncryptAlgorithm; +import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; + +class RC4EncryptAlgorithmTest { + + private StandardEncryptAlgorithm encryptAlgorithm; + + @BeforeEach + void setUp() { + Properties props = new Properties(); + props.put("rc4-key-value", "test-sharding"); + encryptAlgorithm = (StandardEncryptAlgorithm) TypedSPILoader.getService(EncryptAlgorithm.class, "RC4", props); + } + + @Test + void assertEncode() { + assertThat(encryptAlgorithm.encrypt("test", mock(EncryptContext.class)), is("4Tn7lQ==")); + } + + @Test + void assertEncryptNullValue() { + assertNull(encryptAlgorithm.encrypt(null, mock(EncryptContext.class))); + } + + @Test + void assertKeyIsTooLong() { + Properties props = new Properties(); + props.put("rc4-key-value", IntStream.range(0, 100).mapToObj(each -> "test").collect(Collectors.joining())); + assertThrows(EncryptAlgorithmInitializationException.class, () -> encryptAlgorithm.init(props)); + } + + @Test + void assertKeyIsTooShort() { + Properties props = new Properties(); + props.put("rc4-key-value", "test"); + assertThrows(EncryptAlgorithmInitializationException.class, () -> encryptAlgorithm.init(props)); + } + + @Test + void assertDecode() { + assertThat(encryptAlgorithm.decrypt("4Tn7lQ==", mock(EncryptContext.class)).toString(), is("test")); + } + + @Test + void assertDecryptNullValue() { + assertNull(encryptAlgorithm.decrypt(null, mock(EncryptContext.class))); + } +} \ No newline at end of file diff --git a/jdbc/driver/pom.xml b/jdbc/driver/pom.xml new file mode 100644 index 0000000..4af282b --- /dev/null +++ b/jdbc/driver/pom.xml @@ -0,0 +1,52 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.shardingsphere</groupId> + <artifactId>shardingsphere-plugin-jdbc</artifactId> + <version>5.4.1-SNAPSHOT</version> + </parent> + <artifactId>shardingsphere-plugin-jdbc-driver</artifactId> + <name>${project.artifactId}</name> + + <properties> + <nacos.version>1.4.2</nacos.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.shardingsphere</groupId> + <artifactId>shardingsphere-jdbc-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.shardingsphere</groupId> + <artifactId>shardingsphere-test-util</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.ctrip.framework.apollo</groupId> + <artifactId>apollo-client</artifactId> + </dependency> + </dependencies> +</project> diff --git a/jdbc/driver/src/main/java/org/apache/shardingsphere/jdbc/driver/ApolloURLProvider.java b/jdbc/driver/src/main/java/org/apache/shardingsphere/jdbc/driver/ApolloURLProvider.java new file mode 100644 index 0000000..3f2e1a3 --- /dev/null +++ b/jdbc/driver/src/main/java/org/apache/shardingsphere/jdbc/driver/ApolloURLProvider.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.jdbc.driver; + +import com.ctrip.framework.apollo.ConfigFile; +import com.ctrip.framework.apollo.ConfigService; +import com.ctrip.framework.apollo.core.enums.ConfigFileFormat; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import org.apache.shardingsphere.driver.jdbc.core.driver.ShardingSphereURLProvider; + +import java.nio.charset.StandardCharsets; + +/** + * Apollo URL provider. + */ +public final class ApolloURLProvider implements ShardingSphereURLProvider { + + private static final String APOLLO_TYPE = "apollo:"; + + @Override + public boolean accept(final String url) { + return !Strings.isNullOrEmpty(url) && url.contains(APOLLO_TYPE); + } + + @Override + public byte[] getContent(final String url, final String urlPrefix) { + String configPath = url.substring(urlPrefix.length(), url.contains("?") ? url.indexOf('?') : url.length()); + String namespace = configPath.substring(APOLLO_TYPE.length()); + Preconditions.checkArgument(!namespace.isEmpty(), "Apollo namespace is required in ShardingSphere URL."); + ConfigFile configFile = ConfigService.getConfigFile(namespace, ConfigFileFormat.YAML); + return configFile.getContent().getBytes(StandardCharsets.UTF_8); + } +} diff --git a/jdbc/driver/src/main/resources/META-INF/services/org.apache.shardingsphere.driver.jdbc.core.driver.ShardingSphereURLProvider b/jdbc/driver/src/main/resources/META-INF/services/org.apache.shardingsphere.driver.jdbc.core.driver.ShardingSphereURLProvider new file mode 100644 index 0000000..1f988c6 --- /dev/null +++ b/jdbc/driver/src/main/resources/META-INF/services/org.apache.shardingsphere.driver.jdbc.core.driver.ShardingSphereURLProvider @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.shardingsphere.jdbc.driver.ApolloURLProvider diff --git a/jdbc/pom.xml b/jdbc/pom.xml new file mode 100644 index 0000000..ecfa94a --- /dev/null +++ b/jdbc/pom.xml @@ -0,0 +1,33 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.shardingsphere</groupId> + <artifactId>shardingsphere-plugin</artifactId> + <version>5.4.1-SNAPSHOT</version> + </parent> + <artifactId>shardingsphere-plugin-jdbc</artifactId> + <packaging>pom</packaging> + <name>${project.artifactId}</name> + + <modules> + <module>driver</module> + </modules> +</project> diff --git a/mode/cluster/pom.xml b/mode/cluster/pom.xml new file mode 100644 index 0000000..3997fd1 --- /dev/null +++ b/mode/cluster/pom.xml @@ -0,0 +1,33 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.shardingsphere</groupId> + <artifactId>shardingsphere-plugin-mode</artifactId> + <version>5.4.1-SNAPSHOT</version> + </parent> + <artifactId>shardingsphere-plugin-mode-cluster</artifactId> + <packaging>pom</packaging> + <name>${project.artifactId}</name> + + <modules> + <module>repository</module> + </modules> +</project> diff --git a/mode/cluster/repository/nacos/pom.xml b/mode/cluster/repository/nacos/pom.xml new file mode 100644 index 0000000..39ce8b8 --- /dev/null +++ b/mode/cluster/repository/nacos/pom.xml @@ -0,0 +1,53 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.shardingsphere</groupId> + <artifactId>shardingsphere-plugin-mode-cluster-repository</artifactId> + <version>5.4.1-SNAPSHOT</version> + </parent> + <artifactId>shardingsphere-plugin-mode-cluster-repository-nacos</artifactId> + <name>${project.artifactId}</name> + + <properties> + <nacos.version>1.4.2</nacos.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.shardingsphere</groupId> + <artifactId>shardingsphere-cluster-mode-repository-api</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.shardingsphere</groupId> + <artifactId>shardingsphere-test-util</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.alibaba.nacos</groupId> + <artifactId>nacos-client</artifactId> + <version>${nacos.version}</version> + </dependency> + </dependencies> +</project> diff --git a/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java b/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java new file mode 100644 index 0000000..0a07680 --- /dev/null +++ b/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepository.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.repository.cluster.nacos; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.NamingFactory; +import com.alibaba.nacos.api.naming.NamingService; +import com.alibaba.nacos.api.naming.PreservedMetadataKeys; +import com.alibaba.nacos.api.naming.pojo.Instance; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import lombok.SneakyThrows; +import org.apache.shardingsphere.infra.instance.util.IpUtils; +import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository; +import org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration; +import org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException; +import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; +import org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder; +import org.apache.shardingsphere.mode.repository.cluster.nacos.entity.KeyValue; +import org.apache.shardingsphere.mode.repository.cluster.nacos.entity.ServiceController; +import org.apache.shardingsphere.mode.repository.cluster.nacos.entity.ServiceMetaData; +import org.apache.shardingsphere.mode.repository.cluster.nacos.listener.NamingEventListener; +import org.apache.shardingsphere.mode.repository.cluster.nacos.props.NacosProperties; +import org.apache.shardingsphere.mode.repository.cluster.nacos.props.NacosPropertyKey; +import org.apache.shardingsphere.mode.repository.cluster.nacos.util.NacosMetaDataUtils; + +import java.security.SecureRandom; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Registry repository of Nacos. + */ +public final class NacosRepository implements ClusterPersistRepository { + + private final Random random = new SecureRandom(); + + private NamingService client; + + private NacosProperties nacosProps; + + private ServiceController serviceController; + + @Override + public void init(final ClusterPersistRepositoryConfiguration config) { + nacosProps = new NacosProperties(config.getProps()); + client = createClient(config); + initServiceMetaData(); + } + + private NamingService createClient(final ClusterPersistRepositoryConfiguration config) { + Properties props = new Properties(); + props.setProperty("serverAddr", config.getServerLists()); + props.setProperty("namespace", config.getNamespace()); + props.setProperty("username", nacosProps.getValue(NacosPropertyKey.USERNAME)); + props.setProperty("password", nacosProps.getValue(NacosPropertyKey.PASSWORD)); + try { + return NamingFactory.createNamingService(props); + } catch (final NacosException ex) { + throw new ClusterPersistRepositoryException(ex); + } + } + + private void initServiceMetaData() { + try { + String clusterIp = nacosProps.getValue(NacosPropertyKey.CLUSTER_IP); + String ip = Strings.isNullOrEmpty(clusterIp) ? IpUtils.getIp() : clusterIp; + serviceController = new ServiceController(); + for (ServiceMetaData each : serviceController.getAllServices()) { + Integer port = client.getAllInstances(each.getServiceName(), false).stream() + .filter(instance -> ip.equals(instance.getIp())).map(Instance::getPort).max(Comparator.naturalOrder()).orElse(Integer.MIN_VALUE); + each.setIp(ip); + each.setPort(new AtomicInteger(port)); + } + } catch (final NacosException ex) { + throw new ClusterPersistRepositoryException(ex); + } + } + + @Override + public void persistEphemeral(final String key, final String value) { + try { + Preconditions.checkNotNull(value, "Value can not be null"); + if (!findExistedInstance(key, true).isEmpty()) { + delete(key); + } + put(key, value, true); + } catch (final NacosException ex) { + throw new ClusterPersistRepositoryException(ex); + } + } + + @Override + public void persistExclusiveEphemeral(final String key, final String value) { + try { + Preconditions.checkState(findExistedInstance(key, true).isEmpty(), "Key `%s` already exists", key); + put(key, value, true); + } catch (final NacosException ex) { + throw new ClusterPersistRepositoryException(ex); + } + } + + @Override + public DistributedLockHolder getDistributedLockHolder() { + return null; + } + + @Override + public void watch(final String key, final DataChangedEventListener listener) { + try { + for (ServiceMetaData each : serviceController.getAllServices()) { + NamingEventListener eventListener = each.getListener(); + if (null != eventListener) { + eventListener.put(key, listener); + return; + } + eventListener = new NamingEventListener(); + eventListener.put(key, listener); + each.setListener(eventListener); + client.subscribe(each.getServiceName(), eventListener); + } + } catch (final NacosException ex) { + throw new ClusterPersistRepositoryException(ex); + } + } + + @Override + public String getDirectly(final String key) { + try { + for (ServiceMetaData each : serviceController.getAllServices()) { + Optional<Instance> instance = findExistedInstance(key, each.isEphemeral()).stream().max(Comparator.comparing(NacosMetaDataUtils::getTimestamp)); + if (instance.isPresent()) { + return NacosMetaDataUtils.getValue(instance.get()); + } + } + return null; + } catch (final NacosException ex) { + throw new ClusterPersistRepositoryException(ex); + } + } + + @Override + public List<String> getChildrenKeys(final String key) { + try { + Stream<String> concatKeys = Stream.empty(); + for (ServiceMetaData each : serviceController.getAllServices()) { + Stream<String> keys = findExistedInstance(each.isEphemeral()).stream() + .map(instance -> { + String fullPath = NacosMetaDataUtils.getKey(instance); + if (fullPath.startsWith(key + PATH_SEPARATOR)) { + String pathWithoutPrefix = fullPath.substring((key + PATH_SEPARATOR).length()); + return pathWithoutPrefix.contains(PATH_SEPARATOR) ? pathWithoutPrefix.substring(0, pathWithoutPrefix.indexOf(PATH_SEPARATOR)) : pathWithoutPrefix; + } + return null; + }).filter(Objects::nonNull); + concatKeys = Stream.concat(concatKeys, keys); + } + return concatKeys.distinct().sorted(Comparator.reverseOrder()).collect(Collectors.toList()); + } catch (final NacosException ex) { + throw new ClusterPersistRepositoryException(ex); + } + } + + @Override + public boolean isExisted(final String key) { + return false; + } + + @Override + public void persist(final String key, final String value) { + try { + Preconditions.checkNotNull(value, "Value can not be null"); + Optional<Instance> instance = findExistedInstance(key, false).stream().max(Comparator.comparing(NacosMetaDataUtils::getTimestamp)); + if (instance.isPresent()) { + update(instance.get(), value); + } else { + put(key, value, false); + } + } catch (final NacosException ex) { + throw new ClusterPersistRepositoryException(ex); + } + } + + @Override + public void update(final String key, final String value) { + // TODO + } + + private void update(final Instance instance, final String value) throws NacosException { + Map<String, String> metaDataMap = instance.getMetadata(); + String key = NacosMetaDataUtils.getKey(instance); + metaDataMap.put(key, value); + metaDataMap.put(NacosMetaDataUtils.UTC_ZONE_OFFSET.toString(), String.valueOf(NacosMetaDataUtils.getTimestamp())); + instance.setMetadata(metaDataMap); + ServiceMetaData persistentService = serviceController.getPersistentService(); + client.registerInstance(persistentService.getServiceName(), instance); + Collection<KeyValue> keyValues = new LinkedList<>(); + keyValues.add(new KeyValue(key, value, instance.isEphemeral())); + waitValue(keyValues); + } + + private void put(final String key, final String value, final boolean ephemeral) throws NacosException { + final Collection<KeyValue> keyValues = buildParentPath(key); + ServiceMetaData serviceMetaData = serviceController.getService(ephemeral); + Instance instance = new Instance(); + instance.setIp(serviceMetaData.getIp()); + instance.setPort(serviceMetaData.getPort()); + instance.setEphemeral(ephemeral); + Map<String, String> metadataMap = new HashMap<>(5, 1F); + if (ephemeral) { + fillEphemeralMetaData(metadataMap); + } + metadataMap.put(key, value); + metadataMap.put(NacosMetaDataUtils.UTC_ZONE_OFFSET.toString(), String.valueOf(NacosMetaDataUtils.getTimestamp())); + instance.setMetadata(metadataMap); + client.registerInstance(serviceMetaData.getServiceName(), instance); + keyValues.add(new KeyValue(key, value, ephemeral)); + waitValue(keyValues); + } + + private Collection<KeyValue> buildParentPath(final String key) throws NacosException { + Collection<KeyValue> result = new LinkedList<>(); + StringBuilder parentPath = new StringBuilder(); + String[] partPath = key.split(PATH_SEPARATOR); + for (int index = 1; index < partPath.length - 1; index++) { + String path = parentPath.append(PATH_SEPARATOR).append(partPath[index]).toString(); + if (findExistedInstance(path, false).isEmpty()) { + result.addAll(build(path)); + } + } + return result; + } + + private Collection<KeyValue> build(final String key) throws NacosException { + Collection<KeyValue> result = new LinkedList<>(); + if (findExistedInstance(key, false).isEmpty()) { + Instance instance = new Instance(); + ServiceMetaData persistentService = serviceController.getPersistentService(); + instance.setIp(persistentService.getIp()); + instance.setPort(persistentService.getPort()); + instance.setEphemeral(false); + Map<String, String> metaDataMap = new HashMap<>(2, 1F); + metaDataMap.put(key, ""); + metaDataMap.put(NacosMetaDataUtils.UTC_ZONE_OFFSET.toString(), String.valueOf(NacosMetaDataUtils.getTimestamp())); + instance.setMetadata(metaDataMap); + client.registerInstance(persistentService.getServiceName(), instance); + result.add(new KeyValue(key, "", false)); + } + return result; + } + + private void fillEphemeralMetaData(final Map<String, String> metaDataMap) { + int timeToLiveSeconds = nacosProps.getValue(NacosPropertyKey.TIME_TO_LIVE_SECONDS); + metaDataMap.put(PreservedMetadataKeys.HEART_BEAT_INTERVAL, String.valueOf(timeToLiveSeconds * 1000 / 3)); + metaDataMap.put(PreservedMetadataKeys.HEART_BEAT_TIMEOUT, String.valueOf(timeToLiveSeconds * 1000 * 2 / 3)); + metaDataMap.put(PreservedMetadataKeys.IP_DELETE_TIMEOUT, String.valueOf(timeToLiveSeconds * 1000)); + } + + @Override + public void delete(final String key) { + try { + for (ServiceMetaData each : serviceController.getAllServices()) { + Collection<Instance> instances = findExistedInstance(each.isEphemeral()).stream() + .filter(instance -> { + String fullPath = NacosMetaDataUtils.getKey(instance); + return fullPath.startsWith(key + PATH_SEPARATOR) || key.equals(fullPath); + }) + .sorted(Comparator.comparing(NacosMetaDataUtils::getKey).reversed()).collect(Collectors.toList()); + Collection<KeyValue> keyValues = new LinkedList<>(); + for (Instance instance : instances) { + client.deregisterInstance(each.getServiceName(), instance); + keyValues.add(new KeyValue(NacosMetaDataUtils.getKey(instance), null, each.isEphemeral())); + } + waitValue(keyValues); + } + } catch (final NacosException ex) { + throw new ClusterPersistRepositoryException(ex); + } + } + + private Collection<Instance> findExistedInstance(final String key, final boolean ephemeral) throws NacosException { + return client.getAllInstances(serviceController.getService(ephemeral).getServiceName(), false).stream() + .filter(each -> Objects.equals(key, NacosMetaDataUtils.getKey(each))).collect(Collectors.toList()); + } + + private Collection<Instance> findExistedInstance(final boolean ephemeral) throws NacosException { + return client.getAllInstances(serviceController.getService(ephemeral).getServiceName(), false); + } + + @SneakyThrows(InterruptedException.class) + private void waitValue(final Collection<KeyValue> keyValues) throws NacosException { + if (!isAvailable(keyValues)) { + long retryIntervalMilliseconds = nacosProps.getValue(NacosPropertyKey.RETRY_INTERVAL_MILLISECONDS); + int maxRetries = nacosProps.getValue(NacosPropertyKey.MAX_RETRIES); + for (int retry = 0; retry < maxRetries; retry++) { + Thread.sleep(getSleepTimeMs(retry, retryIntervalMilliseconds)); + if (isAvailable(keyValues)) { + return; + } + } + throw new NacosException(NacosException.RESOURCE_NOT_FOUND, "Wait value availability timeout exceeded"); + } + } + + private boolean isAvailable(final Collection<KeyValue> keyValues) throws NacosException { + Map<Boolean, List<KeyValue>> keyValueMap = keyValues.stream().collect(Collectors.groupingBy(KeyValue::isEphemeral)); + for (Entry<Boolean, List<KeyValue>> entry : keyValueMap.entrySet()) { + ServiceMetaData service = serviceController.getService(entry.getKey()); + Map<String, List<Instance>> instanceMap = client.getAllInstances(service.getServiceName(), false).stream().collect(Collectors.groupingBy(NacosMetaDataUtils::getKey)); + keyValues.removeIf(keyValue -> { + String key = keyValue.getKey(); + String value = keyValue.getValue(); + return instanceMap.containsKey(key) ? instanceMap.get(key).stream().anyMatch(each -> Objects.equals(NacosMetaDataUtils.getValue(each), value)) : null == value; + }); + } + return keyValues.isEmpty(); + } + + private long getSleepTimeMs(final int retryCount, final long baseSleepTimeMs) { + return baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1))); + } + + @Override + public void close() { + try { + client.shutDown(); + } catch (final NacosException ex) { + throw new ClusterPersistRepositoryException(ex); + } + } + + @Override + public String getType() { + return "Nacos"; + } +} diff --git a/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/entity/KeyValue.java b/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/entity/KeyValue.java new file mode 100644 index 0000000..a11b5cb --- /dev/null +++ b/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/entity/KeyValue.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.repository.cluster.nacos.entity; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * Key value. + */ +@RequiredArgsConstructor +@Getter +public final class KeyValue { + + private final String key; + + private final String value; + + private final boolean ephemeral; +} diff --git a/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/entity/ServiceController.java b/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/entity/ServiceController.java new file mode 100644 index 0000000..5b61396 --- /dev/null +++ b/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/entity/ServiceController.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.repository.cluster.nacos.entity; + +import lombok.Getter; + +import java.util.Collection; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Service controller. + */ +public final class ServiceController { + + private static final String PERSISTENT_SERVICE_NAME = "PERSISTENT"; + + private static final String EPHEMERAL_SERVICE_NAME = "EPHEMERAL"; + + @Getter + private final ServiceMetaData persistentService = new ServiceMetaData(PERSISTENT_SERVICE_NAME, false); + + @Getter + private final ServiceMetaData ephemeralService = new ServiceMetaData(EPHEMERAL_SERVICE_NAME, true); + + private final Map<Boolean, ServiceMetaData> serviceMap = Stream.of(persistentService, ephemeralService).collect(Collectors.toMap(ServiceMetaData::isEphemeral, Function.identity())); + + /** + * Get all services. + * + * @return all services + */ + public Collection<ServiceMetaData> getAllServices() { + return serviceMap.values(); + } + + /** + * Get service. + * + * @param ephemeral is ephemeral service + * @return ephemeral service or persistent service + */ + public ServiceMetaData getService(final boolean ephemeral) { + return serviceMap.get(ephemeral); + } +} diff --git a/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/entity/ServiceMetaData.java b/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/entity/ServiceMetaData.java new file mode 100644 index 0000000..75314a5 --- /dev/null +++ b/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/entity/ServiceMetaData.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.repository.cluster.nacos.entity; + +import com.google.common.base.Preconditions; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import org.apache.shardingsphere.mode.repository.cluster.nacos.listener.NamingEventListener; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Service meta data. + */ +@RequiredArgsConstructor +@Getter +@Setter +public final class ServiceMetaData { + + private final String serviceName; + + private String ip; + + private AtomicInteger port; + + private NamingEventListener listener; + + private final boolean ephemeral; + + /** + * Get incremental port. + * + * @return incremental port + */ + public int getPort() { + int result = port.incrementAndGet(); + Preconditions.checkState(Integer.MIN_VALUE != result, "Specified cluster ip exceeded the maximum number of persisting"); + return result; + } +} diff --git a/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/listener/NamingEventListener.java b/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/listener/NamingEventListener.java new file mode 100644 index 0000000..3f3dc94 --- /dev/null +++ b/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/listener/NamingEventListener.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.repository.cluster.nacos.listener; + +import com.alibaba.nacos.api.naming.listener.Event; +import com.alibaba.nacos.api.naming.listener.EventListener; +import com.alibaba.nacos.api.naming.listener.NamingEvent; +import com.alibaba.nacos.api.naming.pojo.Instance; +import org.apache.shardingsphere.mode.event.DataChangedEvent; +import org.apache.shardingsphere.mode.event.DataChangedEvent.Type; +import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; +import org.apache.shardingsphere.mode.repository.cluster.nacos.util.NacosMetaDataUtils; + +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Naming event listener. + */ +public final class NamingEventListener implements EventListener { + + private Map<String, Instance> preInstances = new HashMap<>(); + + private final Map<String, DataChangedEventListener> prefixListenerMap = new HashMap<>(); + + @Override + public void onEvent(final Event event) { + if (!(event instanceof NamingEvent)) { + return; + } + NamingEvent namingEvent = (NamingEvent) event; + Collection<Instance> instances = namingEvent.getInstances().stream().sorted(Comparator.comparing(NacosMetaDataUtils::getKey)).collect(Collectors.toList()); + Collection<WatchData> watchDataList = new LinkedList<>(); + synchronized (this) { + instances.forEach(instance -> prefixListenerMap.forEach((prefixPath, listener) -> { + String key = NacosMetaDataUtils.getKey(instance); + if (key.startsWith(prefixPath)) { + Instance preInstance = preInstances.remove(key); + WatchData watchData = new WatchData(key, preInstance, instance, listener); + watchDataList.add(watchData); + } + })); + preInstances.values().stream().sorted(Comparator.comparing(NacosMetaDataUtils::getKey).reversed()).forEach(instance -> prefixListenerMap.forEach((prefixPath, listener) -> { + String key = NacosMetaDataUtils.getKey(instance); + if (key.startsWith(prefixPath)) { + WatchData watchData = new WatchData(key, instance, null, listener); + watchDataList.add(watchData); + } + })); + watchDataList.forEach(this::watch); + setPreInstances(instances); + } + } + + private void watch(final WatchData watchData) { + String key = watchData.getKey(); + Instance preInstance = watchData.getPreInstance(); + Instance instance = watchData.getInstance(); + DataChangedEventListener listener = watchData.getListener(); + Type changedType = getEventChangedType(preInstance, instance); + switch (changedType) { + case ADDED: + case UPDATED: + listener.onChange(new DataChangedEvent(key, NacosMetaDataUtils.getValue(instance), changedType)); + break; + case DELETED: + listener.onChange(new DataChangedEvent(key, NacosMetaDataUtils.getValue(preInstance), changedType)); + break; + default: + } + } + + private Type getEventChangedType(final Instance preInstance, final Instance instance) { + if (null == preInstance && null != instance) { + return Type.ADDED; + } + if (null != preInstance && null != instance && NacosMetaDataUtils.getTimestamp(preInstance) != NacosMetaDataUtils.getTimestamp(instance)) { + return Type.UPDATED; + } + if (null != preInstance && null == instance) { + return Type.DELETED; + } + return Type.IGNORED; + } + + /** + * Update pre instances. + * + * @param instances instances + */ + public void setPreInstances(final Collection<Instance> instances) { + preInstances = instances.stream().filter(instance -> { + for (String each : prefixListenerMap.keySet()) { + if (NacosMetaDataUtils.getKey(instance).startsWith(each)) { + return true; + } + } + return false; + }).collect(Collectors.toMap(NacosMetaDataUtils::getKey, Function.identity(), + (oldValue, currentValue) -> NacosMetaDataUtils.getTimestamp(oldValue) > NacosMetaDataUtils.getTimestamp(currentValue) ? oldValue : currentValue)); + } + + /** + * Put prefix path and listener. + * + * @param prefixPath prefix path + * @param listener listener + */ + public synchronized void put(final String prefixPath, final DataChangedEventListener listener) { + prefixListenerMap.put(prefixPath, listener); + } +} diff --git a/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/listener/WatchData.java b/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/listener/WatchData.java new file mode 100644 index 0000000..939bb03 --- /dev/null +++ b/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/listener/WatchData.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.repository.cluster.nacos.listener; + +import com.alibaba.nacos.api.naming.pojo.Instance; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener; + +/** + * Watch data. + */ +@Getter +@RequiredArgsConstructor +public final class WatchData { + + private final String key; + + private final Instance preInstance; + + private final Instance instance; + + private final DataChangedEventListener listener; +} diff --git a/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/props/NacosProperties.java b/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/props/NacosProperties.java new file mode 100644 index 0000000..5b4e3a5 --- /dev/null +++ b/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/props/NacosProperties.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.repository.cluster.nacos.props; + +import org.apache.shardingsphere.infra.props.TypedProperties; + +import java.util.Properties; + +/** + * Typed properties of Nacos. + */ +public final class NacosProperties extends TypedProperties<NacosPropertyKey> { + + public NacosProperties(final Properties props) { + super(NacosPropertyKey.class, props); + } +} diff --git a/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/props/NacosPropertyKey.java b/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/props/NacosPropertyKey.java new file mode 100644 index 0000000..fbec54a --- /dev/null +++ b/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/props/NacosPropertyKey.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.repository.cluster.nacos.props; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.infra.props.TypedPropertyKey; + +/** + * Typed property key of Nacos. + */ +@RequiredArgsConstructor +@Getter +public enum NacosPropertyKey implements TypedPropertyKey { + + /** + * Cluster ip. + */ + CLUSTER_IP("clusterIp", "", String.class), + + /** + * Retry interval milliseconds when checking whether value is available. + */ + RETRY_INTERVAL_MILLISECONDS("retryIntervalMilliseconds", String.valueOf(500), long.class), + + /** + * Max Retry times when checking whether value is available. + */ + MAX_RETRIES("maxRetries", String.valueOf(3), int.class), + + /** + * Time to live seconds. + */ + TIME_TO_LIVE_SECONDS("timeToLiveSeconds", String.valueOf(30), int.class), + + /** + * Username. + */ + USERNAME("username", "", String.class), + + /** + * Password. + */ + PASSWORD("password", "", String.class); + + private final String key; + + private final String defaultValue; + + private final Class<?> type; +} diff --git a/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/util/NacosMetaDataUtils.java b/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/util/NacosMetaDataUtils.java new file mode 100644 index 0000000..c56447c --- /dev/null +++ b/mode/cluster/repository/nacos/src/main/java/org/apache/shardingsphere/mode/repository/cluster/nacos/util/NacosMetaDataUtils.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.repository.cluster.nacos.util; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.PreservedMetadataKeys; +import com.alibaba.nacos.api.naming.pojo.Instance; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.SneakyThrows; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; + +/** + * Nacos meta data utility class. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class NacosMetaDataUtils { + + public static final ZoneOffset UTC_ZONE_OFFSET = ZoneOffset.of("+8"); + + /** + * Get timestamp. + * + * @param instance instance + * @return timestamp + */ + public static long getTimestamp(final Instance instance) { + return Long.parseLong(instance.getMetadata().get(UTC_ZONE_OFFSET.toString())); + } + + /** + * Get timestamp. + * + * @return timeStamp + */ + public static long getTimestamp() { + return LocalDateTime.now().toInstant(UTC_ZONE_OFFSET).toEpochMilli(); + } + + /** + * Get value. + * + * @param instance instance + * @return value + */ + public static String getValue(final Instance instance) { + return instance.getMetadata().get(getKey(instance)); + } + + /** + * Get key. + * + * @param instance instance + * @return key + */ + @SneakyThrows(NacosException.class) + public static String getKey(final Instance instance) { + return instance.getMetadata().keySet().stream() + .filter(entryKey -> !PreservedMetadataKeys.HEART_BEAT_INTERVAL.equals(entryKey) + && !PreservedMetadataKeys.HEART_BEAT_TIMEOUT.equals(entryKey) + && !PreservedMetadataKeys.IP_DELETE_TIMEOUT.equals(entryKey) + && !UTC_ZONE_OFFSET.toString().equals(entryKey)) + .findFirst().orElseThrow(() -> new NacosException(NacosException.RESOURCE_NOT_FOUND, "Failed to find key ")); + } +} diff --git a/mode/cluster/repository/nacos/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository b/mode/cluster/repository/nacos/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository new file mode 100644 index 0000000..7ab9d96 --- /dev/null +++ b/mode/cluster/repository/nacos/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.shardingsphere.mode.repository.cluster.nacos.NacosRepository diff --git a/mode/cluster/repository/nacos/src/test/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepositoryTest.java b/mode/cluster/repository/nacos/src/test/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepositoryTest.java new file mode 100644 index 0000000..efccc41 --- /dev/null +++ b/mode/cluster/repository/nacos/src/test/java/org/apache/shardingsphere/mode/repository/cluster/nacos/NacosRepositoryTest.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.repository.cluster.nacos; + +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.naming.NamingService; +import com.alibaba.nacos.api.naming.PreservedMetadataKeys; +import com.alibaba.nacos.api.naming.listener.Event; +import com.alibaba.nacos.api.naming.listener.EventListener; +import com.alibaba.nacos.api.naming.listener.NamingEvent; +import com.alibaba.nacos.api.naming.pojo.Instance; +import com.google.common.util.concurrent.SettableFuture; +import org.apache.shardingsphere.mode.event.DataChangedEvent; +import org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException; +import org.apache.shardingsphere.mode.repository.cluster.nacos.entity.ServiceController; +import org.apache.shardingsphere.mode.repository.cluster.nacos.entity.ServiceMetaData; +import org.apache.shardingsphere.mode.repository.cluster.nacos.props.NacosProperties; +import org.apache.shardingsphere.mode.repository.cluster.nacos.props.NacosPropertyKey; +import org.apache.shardingsphere.mode.repository.cluster.nacos.util.NacosMetaDataUtils; +import org.apache.shardingsphere.mode.spi.PersistRepository; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.AdditionalAnswers; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.internal.configuration.plugins.Plugins; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.plugins.MemberAccessor; +import org.mockito.stubbing.VoidAnswer2; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class NacosRepositoryTest { + + private static final NacosRepository REPOSITORY = new NacosRepository(); + + @Mock + private NamingService client; + + private ServiceController serviceController; + + @BeforeEach + void initClient() throws ReflectiveOperationException { + MemberAccessor accessor = Plugins.getMemberAccessor(); + accessor.set(REPOSITORY.getClass().getDeclaredField("nacosProps"), REPOSITORY, new NacosProperties(new Properties())); + accessor.set(REPOSITORY.getClass().getDeclaredField("client"), REPOSITORY, client); + accessor.invoke(REPOSITORY.getClass().getDeclaredMethod("initServiceMetaData"), REPOSITORY); + serviceController = (ServiceController) accessor.get(REPOSITORY.getClass().getDeclaredField("serviceController"), REPOSITORY); + } + + @Test + void assertGetLatestKey() throws NacosException { + int total = 2; + String key = "/test/children/keys/persistent/1"; + List<Instance> instances = new LinkedList<>(); + for (int count = 1; count <= total; count++) { + Instance instance = new Instance(); + Map<String, String> metaDataMap = new HashMap<>(2, 1F); + metaDataMap.put(key, "value" + count); + metaDataMap.put(NacosMetaDataUtils.UTC_ZONE_OFFSET.toString(), String.valueOf(count)); + instance.setMetadata(metaDataMap); + instances.add(instance); + } + ServiceMetaData persistentService = serviceController.getPersistentService(); + when(client.getAllInstances(persistentService.getServiceName(), false)).thenReturn(instances); + String value = REPOSITORY.getDirectly(key); + assertThat(value, is("value2")); + } + + @Test + void assertGetChildrenKeys() throws NacosException { + Instance instance = new Instance(); + String key = "/test/children/keys/persistent/0"; + instance.setMetadata(Collections.singletonMap(key, "value0")); + ServiceMetaData persistentService = serviceController.getPersistentService(); + when(client.getAllInstances(persistentService.getServiceName(), false)).thenReturn(Collections.singletonList(instance)); + instance = new Instance(); + key = "/test/children/keys/ephemeral/0"; + instance.setMetadata(Collections.singletonMap(key, "value0")); + ServiceMetaData ephemeralService = serviceController.getEphemeralService(); + when(client.getAllInstances(ephemeralService.getServiceName(), false)).thenReturn(Collections.singletonList(instance)); + List<String> childrenKeys = REPOSITORY.getChildrenKeys("/test/children/keys"); + assertThat(childrenKeys.size(), is(2)); + assertThat(childrenKeys.get(0), is("persistent")); + assertThat(childrenKeys.get(1), is("ephemeral")); + } + + @Test + void assertPersistNotExistKey() throws NacosException { + String key = "/test/children/keys/persistent/1"; + doAnswer(AdditionalAnswers.answerVoid(getRegisterInstanceAnswer())).when(client).registerInstance(anyString(), any(Instance.class)); + REPOSITORY.persist(key, "value4"); + ArgumentCaptor<Instance> instanceArgumentCaptor = ArgumentCaptor.forClass(Instance.class); + ArgumentCaptor<String> stringArgumentCaptor = ArgumentCaptor.forClass(String.class); + verify(client, times(5)).registerInstance(stringArgumentCaptor.capture(), instanceArgumentCaptor.capture()); + Instance registerInstance = instanceArgumentCaptor.getValue(); + String registerType = stringArgumentCaptor.getValue(); + ServiceMetaData persistentService = serviceController.getPersistentService(); + assertThat(registerType, is(persistentService.getServiceName())); + assertThat(registerInstance.isEphemeral(), is(false)); + assertThat(NacosMetaDataUtils.getValue(registerInstance), is("value4")); + } + + @Test + void assertPersistExistKey() throws NacosException { + String ip = "127.0.0.1"; + Instance instance = new Instance(); + instance.setIp(ip); + instance.setEphemeral(false); + String key = "/test/children/keys/persistent/0"; + instance.setMetadata(new HashMap<>(Collections.singletonMap(key, "value0"))); + List<Instance> instances = new LinkedList<>(); + buildParentPath(key, instances); + instances.add(instance); + ServiceMetaData persistentService = serviceController.getPersistentService(); + when(client.getAllInstances(persistentService.getServiceName(), false)).thenReturn(instances); + doAnswer(AdditionalAnswers.answerVoid(getRegisterInstanceAnswer())).when(client).registerInstance(anyString(), any(Instance.class)); + REPOSITORY.persist(key, "value4"); + ArgumentCaptor<Instance> instanceArgumentCaptor = ArgumentCaptor.forClass(Instance.class); + ArgumentCaptor<String> stringArgumentCaptor = ArgumentCaptor.forClass(String.class); + verify(client).registerInstance(stringArgumentCaptor.capture(), instanceArgumentCaptor.capture()); + Instance registerInstance = instanceArgumentCaptor.getValue(); + String registerType = stringArgumentCaptor.getValue(); + assertThat(registerType, is(persistentService.getServiceName())); + assertThat(registerInstance.getIp(), is(ip)); + assertThat(registerInstance.isEphemeral(), is(false)); + assertThat(NacosMetaDataUtils.getValue(registerInstance), is("value4")); + } + + @Test + void assertPersistEphemeralExistKey() throws NacosException { + final String key = "/test/children/keys/ephemeral/1"; + final Instance instance = new Instance(); + instance.setEphemeral(true); + Map<String, String> metaDataMap = new HashMap<>(4, 1F); + metaDataMap.put(PreservedMetadataKeys.HEART_BEAT_INTERVAL, String.valueOf(2000)); + metaDataMap.put(PreservedMetadataKeys.HEART_BEAT_TIMEOUT, String.valueOf(4000)); + metaDataMap.put(PreservedMetadataKeys.IP_DELETE_TIMEOUT, String.valueOf(6000)); + metaDataMap.put(key, "value0"); + instance.setMetadata(metaDataMap); + List<Instance> instances = new LinkedList<>(); + buildParentPath(key, instances); + ServiceMetaData persistentService = serviceController.getPersistentService(); + when(client.getAllInstances(persistentService.getServiceName(), false)).thenReturn(instances); + instances = new LinkedList<>(); + instances.add(instance); + ServiceMetaData ephemeralService = serviceController.getEphemeralService(); + when(client.getAllInstances(ephemeralService.getServiceName(), false)).thenReturn(instances); + doAnswer(AdditionalAnswers.answerVoid(getDeregisterInstanceAnswer())).when(client).deregisterInstance(anyString(), any(Instance.class)); + doAnswer(AdditionalAnswers.answerVoid(getRegisterInstanceAnswer())).when(client).registerInstance(anyString(), any(Instance.class)); + REPOSITORY.persistEphemeral(key, "value4"); + ArgumentCaptor<Instance> instanceArgumentCaptor = ArgumentCaptor.forClass(Instance.class); + ArgumentCaptor<String> stringArgumentCaptor = ArgumentCaptor.forClass(String.class); + verify(client).deregisterInstance(anyString(), any(Instance.class)); + verify(client).registerInstance(stringArgumentCaptor.capture(), instanceArgumentCaptor.capture()); + Instance registerInstance = instanceArgumentCaptor.getValue(); + String registerType = stringArgumentCaptor.getValue(); + assertThat(registerType, is(ephemeralService.getServiceName())); + assertThat(registerInstance.isEphemeral(), is(true)); + assertThat(NacosMetaDataUtils.getValue(registerInstance), is("value4")); + Map<String, String> metaData = registerInstance.getMetadata(); + long timeToLiveSeconds = Long.parseLong(NacosPropertyKey.TIME_TO_LIVE_SECONDS.getDefaultValue()); + assertThat(metaData.get(PreservedMetadataKeys.HEART_BEAT_INTERVAL), is(String.valueOf(timeToLiveSeconds * 1000 / 3))); + assertThat(metaData.get(PreservedMetadataKeys.HEART_BEAT_TIMEOUT), is(String.valueOf(timeToLiveSeconds * 1000 * 2 / 3))); + assertThat(metaData.get(PreservedMetadataKeys.IP_DELETE_TIMEOUT), is(String.valueOf(timeToLiveSeconds * 1000))); + } + + private void buildParentPath(final String key, final List<Instance> instances) { + StringBuilder parentPath = new StringBuilder(); + final String[] partPath = key.split(PersistRepository.PATH_SEPARATOR); + for (int index = 1; index < partPath.length - 1; index++) { + parentPath.append(PersistRepository.PATH_SEPARATOR); + parentPath.append(partPath[index]); + String path = parentPath.toString(); + Instance instance = new Instance(); + instance.setEphemeral(false); + instance.setMetadata(Collections.singletonMap(path, "")); + instances.add(instance); + } + } + + @Test + void assertPersistEphemeralNotExistKey() throws NacosException { + String key = "/test/children/keys/ephemeral/0"; + doAnswer(AdditionalAnswers.answerVoid(getRegisterInstanceAnswer())).when(client).registerInstance(anyString(), any(Instance.class)); + REPOSITORY.persistEphemeral(key, "value0"); + ArgumentCaptor<Instance> instanceArgumentCaptor = ArgumentCaptor.forClass(Instance.class); + ArgumentCaptor<String> stringArgumentCaptor = ArgumentCaptor.forClass(String.class); + verify(client, times(5)).registerInstance(stringArgumentCaptor.capture(), instanceArgumentCaptor.capture()); + Instance registerInstance = instanceArgumentCaptor.getValue(); + String registerType = stringArgumentCaptor.getValue(); + ServiceMetaData ephemeralService = serviceController.getEphemeralService(); + assertThat(registerType, is(ephemeralService.getServiceName())); + assertThat(registerInstance.isEphemeral(), is(true)); + assertThat(NacosMetaDataUtils.getValue(registerInstance), is("value0")); + Map<String, String> metaData = registerInstance.getMetadata(); + long timeToLiveSeconds = Long.parseLong(NacosPropertyKey.TIME_TO_LIVE_SECONDS.getDefaultValue()); + assertThat(metaData.get(PreservedMetadataKeys.HEART_BEAT_INTERVAL), is(String.valueOf(timeToLiveSeconds * 1000 / 3))); + assertThat(metaData.get(PreservedMetadataKeys.HEART_BEAT_TIMEOUT), is(String.valueOf(timeToLiveSeconds * 1000 * 2 / 3))); + assertThat(metaData.get(PreservedMetadataKeys.IP_DELETE_TIMEOUT), is(String.valueOf(timeToLiveSeconds * 1000))); + } + + @Test + void assertDeleteExistKey() throws NacosException { + int total = 3; + List<Instance> instances = new LinkedList<>(); + for (int count = 1; count <= total; count++) { + String key = "/test/children/keys/ephemeral/" + count; + Instance instance = new Instance(); + instance.setEphemeral(true); + instance.setMetadata(Collections.singletonMap(key, "value" + count)); + instances.add(instance); + } + ServiceMetaData ephemeralService = serviceController.getEphemeralService(); + when(client.getAllInstances(ephemeralService.getServiceName(), false)).thenReturn(instances); + instances = new LinkedList<>(); + String key = "/test/children/keys/persistent/0"; + Instance instance = new Instance(); + instance.setEphemeral(false); + instance.setMetadata(Collections.singletonMap(key, "value0")); + instances.add(instance); + ServiceMetaData persistentService = serviceController.getPersistentService(); + when(client.getAllInstances(persistentService.getServiceName(), false)).thenReturn(instances); + doAnswer(AdditionalAnswers.answerVoid(getDeregisterInstanceAnswer())).when(client).deregisterInstance(anyString(), any(Instance.class)); + REPOSITORY.delete("/test/children/keys"); + verify(client, times(4)).deregisterInstance(anyString(), any(Instance.class)); + } + + @Test + void assertDeleteNotExistKey() throws NacosException { + REPOSITORY.delete("/test/children/keys/persistent/1"); + verify(client, times(0)).deregisterInstance(anyString(), any(Instance.class)); + } + + @Test + void assertWatchAdded() throws NacosException, ExecutionException, InterruptedException { + ServiceMetaData ephemeralService = serviceController.getEphemeralService(); + ephemeralService.setListener(null); + String key = "key/key"; + String value = "value2"; + Instance instance = new Instance(); + instance.setMetadata(Collections.singletonMap(key, value)); + Event event = new NamingEvent(ephemeralService.getServiceName(), Collections.singletonList(instance)); + doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(null, event))).when(client).subscribe(anyString(), any(EventListener.class)); + SettableFuture<DataChangedEvent> settableFuture = SettableFuture.create(); + REPOSITORY.watch(key, settableFuture::set); + DataChangedEvent dataChangedEvent = settableFuture.get(); + assertThat(dataChangedEvent.getType(), is(DataChangedEvent.Type.ADDED)); + assertThat(dataChangedEvent.getKey(), is(key)); + assertThat(dataChangedEvent.getValue(), is(value)); + } + + @Test + void assertWatchUpdate() throws NacosException, ExecutionException, InterruptedException { + ServiceMetaData persistentService = serviceController.getPersistentService(); + persistentService.setListener(null); + String key = "key/key"; + long epochMilliseconds = NacosMetaDataUtils.getTimestamp(); + Instance preInstance = new Instance(); + Map<String, String> metaDataMap = new HashMap<>(); + metaDataMap.put(key, "value1"); + metaDataMap.put(NacosMetaDataUtils.UTC_ZONE_OFFSET.toString(), String.valueOf(epochMilliseconds)); + preInstance.setMetadata(metaDataMap); + final Instance instance = new Instance(); + metaDataMap = new HashMap<>(); + metaDataMap.put(key, "value2"); + metaDataMap.put(NacosMetaDataUtils.UTC_ZONE_OFFSET.toString(), String.valueOf(epochMilliseconds + 1)); + instance.setMetadata(metaDataMap); + Event event = new NamingEvent(persistentService.getServiceName(), Collections.singletonList(instance)); + doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(preInstance, event))).when(client).subscribe(anyString(), any(EventListener.class)); + SettableFuture<DataChangedEvent> settableFuture = SettableFuture.create(); + REPOSITORY.watch(key, settableFuture::set); + DataChangedEvent dataChangedEvent = settableFuture.get(); + assertThat(dataChangedEvent.getType(), is(DataChangedEvent.Type.UPDATED)); + assertThat(dataChangedEvent.getKey(), is(key)); + assertThat(dataChangedEvent.getValue(), is("value2")); + } + + @Test + void assertWatchDelete() throws NacosException, ExecutionException, InterruptedException { + ServiceMetaData persistentService = serviceController.getPersistentService(); + persistentService.setListener(null); + String key = "key/key"; + Instance preInstance = new Instance(); + preInstance.setMetadata(Collections.singletonMap(key, "value1")); + Event event = new NamingEvent(persistentService.getServiceName(), Collections.emptyList()); + doAnswer(AdditionalAnswers.answerVoid(getListenerAnswer(preInstance, event))).when(client).subscribe(anyString(), any(EventListener.class)); + SettableFuture<DataChangedEvent> settableFuture = SettableFuture.create(); + REPOSITORY.watch(key, settableFuture::set); + DataChangedEvent dataChangedEvent = settableFuture.get(); + assertThat(dataChangedEvent.getType(), is(DataChangedEvent.Type.DELETED)); + assertThat(dataChangedEvent.getKey(), is(key)); + assertThat(dataChangedEvent.getValue(), is("value1")); + } + + @Test + void assertClose() throws NacosException { + REPOSITORY.close(); + verify(client).shutDown(); + } + + @Test + void assertPersistNotAvailable() { + assertThrows(ClusterPersistRepositoryException.class, () -> REPOSITORY.persist("/test/children/keys/persistent/1", "value4")); + } + + @Test + void assertExceededMaximum() { + ServiceMetaData ephemeralService = serviceController.getEphemeralService(); + ephemeralService.setPort(new AtomicInteger(Integer.MAX_VALUE)); + assertThrows(IllegalStateException.class, () -> REPOSITORY.persistEphemeral("/key2", "value")); + } + + private VoidAnswer2<String, EventListener> getListenerAnswer(final Instance preInstance, final Event event) { + return (serviceName, listener) -> { + MemberAccessor accessor = Plugins.getMemberAccessor(); + if (null != preInstance) { + Map<String, Instance> preInstances = new HashMap<>(); + preInstances.put(NacosMetaDataUtils.getKey(preInstance), preInstance); + accessor.set(listener.getClass().getDeclaredField("preInstances"), listener, preInstances); + } + listener.onEvent(event); + }; + } + + private VoidAnswer2<String, Instance> getRegisterInstanceAnswer() { + return (serviceName, instance) -> { + List<Instance> instances = client.getAllInstances(serviceName, false); + instances.removeIf(each -> Objects.equals(each.getIp(), instance.getIp()) && each.getPort() == instance.getPort()); + instances.add(instance); + when(client.getAllInstances(serviceName, false)).thenReturn(instances); + }; + } + + private VoidAnswer2<String, Instance> getDeregisterInstanceAnswer() { + return (serviceName, instance) -> { + List<Instance> instances = client.getAllInstances(serviceName, false); + instances.remove(instance); + when(client.getAllInstances(serviceName, false)).thenReturn(instances); + }; + } +} diff --git a/mode/cluster/repository/nacos/src/test/java/org/apache/shardingsphere/mode/repository/cluster/nacos/props/NacosPropertiesTest.java b/mode/cluster/repository/nacos/src/test/java/org/apache/shardingsphere/mode/repository/cluster/nacos/props/NacosPropertiesTest.java new file mode 100644 index 0000000..7e04a08 --- /dev/null +++ b/mode/cluster/repository/nacos/src/test/java/org/apache/shardingsphere/mode/repository/cluster/nacos/props/NacosPropertiesTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.mode.repository.cluster.nacos.props; + +import org.apache.shardingsphere.test.util.PropertiesBuilder; +import org.apache.shardingsphere.test.util.PropertiesBuilder.Property; +import org.junit.jupiter.api.Test; + +import java.util.Properties; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +class NacosPropertiesTest { + + @Test + void assertGetValue() { + NacosProperties actual = new NacosProperties(createProperties()); + assertThat(actual.getValue(NacosPropertyKey.CLUSTER_IP), is("127.0.0.1")); + assertThat(actual.getValue(NacosPropertyKey.RETRY_INTERVAL_MILLISECONDS), is(1000L)); + assertThat(actual.getValue(NacosPropertyKey.MAX_RETRIES), is(5)); + assertThat(actual.getValue(NacosPropertyKey.TIME_TO_LIVE_SECONDS), is(60)); + assertThat(actual.getValue(NacosPropertyKey.USERNAME), is("nacos")); + assertThat(actual.getValue(NacosPropertyKey.PASSWORD), is("nacos")); + } + + private Properties createProperties() { + return PropertiesBuilder.build( + new Property(NacosPropertyKey.CLUSTER_IP.getKey(), "127.0.0.1"), + new Property(NacosPropertyKey.RETRY_INTERVAL_MILLISECONDS.getKey(), "1000"), + new Property(NacosPropertyKey.MAX_RETRIES.getKey(), "5"), + new Property(NacosPropertyKey.TIME_TO_LIVE_SECONDS.getKey(), "60"), + new Property(NacosPropertyKey.USERNAME.getKey(), "nacos"), + new Property(NacosPropertyKey.PASSWORD.getKey(), "nacos")); + } + + @Test + void assertGetDefaultValue() { + NacosProperties actual = new NacosProperties(new Properties()); + assertThat(actual.getValue(NacosPropertyKey.CLUSTER_IP), is("")); + assertThat(actual.getValue(NacosPropertyKey.RETRY_INTERVAL_MILLISECONDS), is(500L)); + assertThat(actual.getValue(NacosPropertyKey.MAX_RETRIES), is(3)); + assertThat(actual.getValue(NacosPropertyKey.TIME_TO_LIVE_SECONDS), is(30)); + assertThat(actual.getValue(NacosPropertyKey.USERNAME), is("")); + assertThat(actual.getValue(NacosPropertyKey.PASSWORD), is("")); + } +} diff --git a/mode/cluster/repository/pom.xml b/mode/cluster/repository/pom.xml new file mode 100644 index 0000000..3b34a47 --- /dev/null +++ b/mode/cluster/repository/pom.xml @@ -0,0 +1,33 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.shardingsphere</groupId> + <artifactId>shardingsphere-plugin-mode-cluster</artifactId> + <version>5.4.1-SNAPSHOT</version> + </parent> + <artifactId>shardingsphere-plugin-mode-cluster-repository</artifactId> + <packaging>pom</packaging> + <name>${project.artifactId}</name> + + <modules> + <module>nacos</module> + </modules> +</project> diff --git a/mode/pom.xml b/mode/pom.xml new file mode 100644 index 0000000..b1f60e6 --- /dev/null +++ b/mode/pom.xml @@ -0,0 +1,33 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.shardingsphere</groupId> + <artifactId>shardingsphere-plugin</artifactId> + <version>5.4.1-SNAPSHOT</version> + </parent> + <artifactId>shardingsphere-plugin-mode</artifactId> + <packaging>pom</packaging> + <name>${project.artifactId}</name> + + <modules> + <module>cluster</module> + </modules> +</project> diff --git a/pom.xml b/pom.xml index 19867d4..354320f 100644 --- a/pom.xml +++ b/pom.xml @@ -33,6 +33,9 @@ <modules> <module>features</module> <module>infra</module> + <module>features/encrypt/like</module> + <module>mode</module> + <module>jdbc</module> </modules> <properties> @@ -57,6 +60,7 @@ <commons-codec.version>1.15</commons-codec.version> <lombok.version>1.18.20</lombok.version> <junit.version>5.9.2</junit.version> + <apollo-client.version>1.9.0</apollo-client.version> <!-- Compile plugin versions --> <maven-enforcer-plugin.version>3.2.1</maven-enforcer-plugin.version> @@ -79,6 +83,12 @@ <version>${lombok.version}</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>com.ctrip.framework.apollo</groupId> + <artifactId>apollo-client</artifactId> + <version>${apollo-client.version}</version> + <scope>provided</scope> + </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId>
