This is an automated email from the ASF dual-hosted git repository. jianglongtao pushed a commit to branch fix-33341 in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
commit 5fac6aee512d0da71e0f45ccb9bc9d59ba7ffd64 Author: Raigor <[email protected]> AuthorDate: Tue Oct 15 14:57:20 2024 +0800 Pick #33248, Check weight load balancer props when create readwrite-splitting rule (#16) --- .../ReadwriteSplittingRuleStatementChecker.java | 42 ++++++++++++++++------ .../AlterReadwriteSplittingRuleExecutorTest.java | 22 +++++++++--- .../CreateReadwriteSplittingRuleExecutorTest.java | 19 +++++++--- 3 files changed, 65 insertions(+), 18 deletions(-) diff --git a/features/readwrite-splitting/distsql/handler/src/main/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/checker/ReadwriteSplittingRuleStatementChecker.java b/features/readwrite-splitting/distsql/handler/src/main/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/checker/ReadwriteSplittingRuleStatementChecker.java index de1a4550a0a..9c4cdb42f6f 100644 --- a/features/readwrite-splitting/distsql/handler/src/main/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/checker/ReadwriteSplittingRuleStatementChecker.java +++ b/features/readwrite-splitting/distsql/handler/src/main/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/checker/ReadwriteSplittingRuleStatementChecker.java @@ -20,6 +20,8 @@ package org.apache.shardingsphere.readwritesplitting.distsql.handler.checker; import com.google.common.base.Strings; import lombok.AccessLevel; import lombok.NoArgsConstructor; +import org.apache.shardingsphere.distsql.segment.AlgorithmSegment; +import org.apache.shardingsphere.infra.algorithm.core.exception.InvalidAlgorithmConfigurationException; import org.apache.shardingsphere.infra.algorithm.loadbalancer.core.LoadBalanceAlgorithm; import org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions; import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.MissingRequiredStorageUnitsException; @@ -46,7 +48,6 @@ import java.util.HashSet; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.Map.Entry; -import java.util.Objects; import java.util.stream.Collectors; /** @@ -94,7 +95,7 @@ public final class ReadwriteSplittingRuleStatementChecker { Collection<String> requiredRuleNames = segments.stream().map(ReadwriteSplittingRuleSegment::getName).collect(Collectors.toList()); Collection<String> currentRuleNames = currentRuleConfig.getDataSources().stream().map(ReadwriteSplittingDataSourceRuleConfiguration::getName).collect(Collectors.toList()); Collection<String> notExistedRuleNames = requiredRuleNames.stream().filter(each -> !currentRuleNames.contains(each)).collect(Collectors.toSet()); - ShardingSpherePreconditions.checkState(notExistedRuleNames.isEmpty(), () -> new MissingRequiredRuleException("Readwrite-splitting", databaseName, notExistedRuleNames)); + ShardingSpherePreconditions.checkMustEmpty(notExistedRuleNames, () -> new MissingRequiredRuleException("Readwrite-splitting", databaseName, notExistedRuleNames)); } private static void checkDuplicateRuleNames(final ShardingSphereDatabase database, @@ -108,12 +109,12 @@ public final class ReadwriteSplittingRuleStatementChecker { private static void checkDuplicateRuleNamesWithSelf(final String databaseName, final Collection<ReadwriteSplittingRuleSegment> segments) { Collection<String> duplicatedRuleNames = getDuplicated(segments.stream().map(ReadwriteSplittingRuleSegment::getName).collect(Collectors.toList())); - ShardingSpherePreconditions.checkState(duplicatedRuleNames.isEmpty(), () -> new DuplicateRuleException("Readwrite-splitting", databaseName, duplicatedRuleNames)); + ShardingSpherePreconditions.checkMustEmpty(duplicatedRuleNames, () -> new DuplicateRuleException("Readwrite-splitting", databaseName, duplicatedRuleNames)); } private static Collection<String> getDuplicated(final Collection<String> required) { return required.stream().collect(Collectors.groupingBy(each -> each, Collectors.counting())).entrySet().stream() - .filter(each -> each.getValue() > 1).map(Entry::getKey).collect(Collectors.toSet()); + .filter(each -> each.getValue() > 1L).map(Entry::getKey).collect(Collectors.toSet()); } private static void checkDuplicateRuleNamesWithExistsDataSources(final ShardingSphereDatabase database, final Collection<ReadwriteSplittingRuleSegment> segments) { @@ -124,7 +125,7 @@ public final class ReadwriteSplittingRuleStatementChecker { } currentRuleNames.addAll(getLogicDataSources(database)); Collection<String> toBeCreatedRuleNames = segments.stream().map(ReadwriteSplittingRuleSegment::getName).filter(currentRuleNames::contains).collect(Collectors.toList()); - ShardingSpherePreconditions.checkState(toBeCreatedRuleNames.isEmpty(), () -> new InvalidRuleConfigurationException("Readwrite-splitting", toBeCreatedRuleNames, + ShardingSpherePreconditions.checkMustEmpty(toBeCreatedRuleNames, () -> new InvalidRuleConfigurationException("Readwrite-splitting", toBeCreatedRuleNames, Collections.singleton(String.format("%s already exists in storage unit", toBeCreatedRuleNames)))); } @@ -135,7 +136,7 @@ public final class ReadwriteSplittingRuleStatementChecker { currentRuleNames.addAll(currentRuleConfig.getDataSources().stream().map(ReadwriteSplittingDataSourceRuleConfiguration::getName).collect(Collectors.toList())); } Collection<String> toBeCreatedRuleNames = segments.stream().map(ReadwriteSplittingRuleSegment::getName).filter(currentRuleNames::contains).collect(Collectors.toList()); - ShardingSpherePreconditions.checkState(toBeCreatedRuleNames.isEmpty(), () -> new DuplicateRuleException("Readwrite-splitting", databaseName, toBeCreatedRuleNames)); + ShardingSpherePreconditions.checkMustEmpty(toBeCreatedRuleNames, () -> new DuplicateRuleException("Readwrite-splitting", databaseName, toBeCreatedRuleNames)); } private static void checkDataSourcesExist(final String databaseName, final Collection<ReadwriteSplittingRuleSegment> segments, final ShardingSphereDatabase database) { @@ -145,7 +146,7 @@ public final class ReadwriteSplittingRuleStatementChecker { requiredDataSources.addAll(each.getReadDataSources()); }); Collection<String> notExistedDataSources = database.getResourceMetaData().getNotExistedDataSources(requiredDataSources); - ShardingSpherePreconditions.checkState(notExistedDataSources.isEmpty(), () -> new MissingRequiredStorageUnitsException(databaseName, notExistedDataSources)); + ShardingSpherePreconditions.checkMustEmpty(notExistedDataSources, () -> new MissingRequiredStorageUnitsException(databaseName, notExistedDataSources)); } private static Collection<String> getLogicDataSources(final ShardingSphereDatabase database) { @@ -211,14 +212,35 @@ public final class ReadwriteSplittingRuleStatementChecker { Collection<String> validStrategyNames = Arrays.stream(TransactionalReadQueryStrategy.values()).map(Enum::name).collect(Collectors.toSet()); for (ReadwriteSplittingRuleSegment each : segments) { if (null != each.getTransactionalReadQueryStrategy()) { - ShardingSpherePreconditions.checkState(validStrategyNames.contains(each.getTransactionalReadQueryStrategy().toUpperCase()), + ShardingSpherePreconditions.checkContains(validStrategyNames, each.getTransactionalReadQueryStrategy().toUpperCase(), () -> new MissingRequiredStrategyException("Transactional read query", Collections.singleton(each.getTransactionalReadQueryStrategy()))); } } } private static void checkLoadBalancers(final Collection<ReadwriteSplittingRuleSegment> segments) { - segments.stream().map(ReadwriteSplittingRuleSegment::getLoadBalancer).filter(Objects::nonNull) - .forEach(each -> TypedSPILoader.checkService(LoadBalanceAlgorithm.class, each.getName(), each.getProps())); + for (ReadwriteSplittingRuleSegment each : segments) { + AlgorithmSegment loadBalancer = each.getLoadBalancer(); + if (loadBalancer != null) { + TypedSPILoader.checkService(LoadBalanceAlgorithm.class, loadBalancer.getName(), loadBalancer.getProps()); + checkProperties(each); + } + } + } + + private static void checkProperties(final ReadwriteSplittingRuleSegment each) { + if ("WEIGHT".equalsIgnoreCase(each.getLoadBalancer().getName())) { + ShardingSpherePreconditions.checkNotEmpty(each.getLoadBalancer().getProps(), + () -> new InvalidAlgorithmConfigurationException("Load balancer", each.getLoadBalancer().getName())); + checkDataSource(each); + } + } + + private static void checkDataSource(final ReadwriteSplittingRuleSegment ruleSegment) { + for (Object each : ruleSegment.getLoadBalancer().getProps().keySet()) { + String dataSourceName = (String) each; + ShardingSpherePreconditions.checkState(ruleSegment.getReadDataSources().contains(dataSourceName) || ruleSegment.getWriteDataSource().equals(dataSourceName), + () -> new InvalidAlgorithmConfigurationException("Load balancer", ruleSegment.getLoadBalancer().getName())); + } } } diff --git a/features/readwrite-splitting/distsql/handler/src/test/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/update/AlterReadwriteSplittingRuleExecutorTest.java b/features/readwrite-splitting/distsql/handler/src/test/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/update/AlterReadwriteSplittingRuleExecutorTest.java index 75050a1ccb2..04a53b0704c 100644 --- a/features/readwrite-splitting/distsql/handler/src/test/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/update/AlterReadwriteSplittingRuleExecutorTest.java +++ b/features/readwrite-splitting/distsql/handler/src/test/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/update/AlterReadwriteSplittingRuleExecutorTest.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.readwritesplitting.distsql.handler.update; import org.apache.shardingsphere.distsql.segment.AlgorithmSegment; +import org.apache.shardingsphere.infra.algorithm.core.exception.InvalidAlgorithmConfigurationException; import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.MissingRequiredStorageUnitsException; import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.MissingRequiredRuleException; import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase; @@ -29,6 +30,8 @@ import org.apache.shardingsphere.readwritesplitting.distsql.segment.ReadwriteSpl import org.apache.shardingsphere.readwritesplitting.distsql.statement.AlterReadwriteSplittingRuleStatement; import org.apache.shardingsphere.readwritesplitting.exception.actual.DuplicateReadwriteSplittingActualDataSourceException; import org.apache.shardingsphere.readwritesplitting.rule.ReadwriteSplittingRule; +import org.apache.shardingsphere.test.util.PropertiesBuilder; +import org.apache.shardingsphere.test.util.PropertiesBuilder.Property; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -112,7 +115,7 @@ class AlterReadwriteSplittingRuleExecutorTest { executor.setRule(rule); assertThrows(DuplicateReadwriteSplittingActualDataSourceException.class, () -> executor.checkBeforeUpdate( - createSQLStatement("readwrite_ds_0", "ds_write_1", Arrays.asList("read_ds_0", "read_ds_1"), "TEST"))); + createSQLStatement("readwrite_ds_0", "ds_write_1", Arrays.asList("read_ds_0", "read_ds_1"), "TEST", new Properties()))); } @Test @@ -134,7 +137,17 @@ class AlterReadwriteSplittingRuleExecutorTest { when(rule.getConfiguration()).thenReturn(createCurrentRuleConfigurationWithMultipleRules()); executor.setRule(rule); assertThrows(DuplicateReadwriteSplittingActualDataSourceException.class, - () -> executor.checkBeforeUpdate(createSQLStatement("readwrite_ds_1", "write_ds_1", Arrays.asList("read_ds_0_0", "read_ds_0_1"), "TEST"))); + () -> executor.checkBeforeUpdate(createSQLStatement("readwrite_ds_1", "write_ds_1", Arrays.asList("read_ds_0_0", "read_ds_0_1"), "TEST", new Properties()))); + } + + @Test + void assertCheckSQLStatementWithInvalidLoadBalancerProperties() { + ReadwriteSplittingRule rule = mock(ReadwriteSplittingRule.class); + when(rule.getConfiguration()).thenReturn(createCurrentRuleConfiguration()); + executor.setRule(rule); + assertThrows(InvalidAlgorithmConfigurationException.class, + () -> executor.checkBeforeUpdate(createSQLStatement("readwrite_ds", "write_ds", Arrays.asList("read_ds_0", "read_ds_1"), "weight", + PropertiesBuilder.build(new Property("read_ds_0", "5"), new Property("read_ds_2", "5"))))); } private AlterReadwriteSplittingRuleStatement createSQLStatement(final String loadBalancerTypeName) { @@ -143,8 +156,9 @@ class AlterReadwriteSplittingRuleExecutorTest { return new AlterReadwriteSplittingRuleStatement(Collections.singleton(ruleSegment)); } - private AlterReadwriteSplittingRuleStatement createSQLStatement(final String ruleName, final String writeDataSource, final Collection<String> readDataSources, final String loadBalancerName) { - ReadwriteSplittingRuleSegment ruleSegment = new ReadwriteSplittingRuleSegment(ruleName, writeDataSource, readDataSources, new AlgorithmSegment(loadBalancerName, new Properties())); + private AlterReadwriteSplittingRuleStatement createSQLStatement(final String ruleName, final String writeDataSource, final Collection<String> readDataSources, + final String loadBalancerName, final Properties props) { + ReadwriteSplittingRuleSegment ruleSegment = new ReadwriteSplittingRuleSegment(ruleName, writeDataSource, readDataSources, new AlgorithmSegment(loadBalancerName, props)); return new AlterReadwriteSplittingRuleStatement(Collections.singleton(ruleSegment)); } diff --git a/features/readwrite-splitting/distsql/handler/src/test/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/update/CreateReadwriteSplittingRuleExecutorTest.java b/features/readwrite-splitting/distsql/handler/src/test/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/update/CreateReadwriteSplittingRuleExecutorTest.java index fad29f3bc2a..84da1ca2590 100644 --- a/features/readwrite-splitting/distsql/handler/src/test/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/update/CreateReadwriteSplittingRuleExecutorTest.java +++ b/features/readwrite-splitting/distsql/handler/src/test/java/org/apache/shardingsphere/readwritesplitting/distsql/handler/update/CreateReadwriteSplittingRuleExecutorTest.java @@ -18,6 +18,7 @@ package org.apache.shardingsphere.readwritesplitting.distsql.handler.update; import org.apache.shardingsphere.distsql.segment.AlgorithmSegment; +import org.apache.shardingsphere.infra.algorithm.core.exception.InvalidAlgorithmConfigurationException; import org.apache.shardingsphere.infra.exception.kernel.metadata.resource.storageunit.MissingRequiredStorageUnitsException; import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.DuplicateRuleException; import org.apache.shardingsphere.infra.exception.kernel.metadata.rule.InvalidRuleConfigurationException; @@ -33,6 +34,8 @@ import org.apache.shardingsphere.readwritesplitting.exception.actual.DuplicateRe import org.apache.shardingsphere.readwritesplitting.rule.ReadwriteSplittingRule; import org.apache.shardingsphere.test.mock.AutoMockExtension; import org.apache.shardingsphere.test.mock.StaticMockSettings; +import org.apache.shardingsphere.test.util.PropertiesBuilder; +import org.apache.shardingsphere.test.util.PropertiesBuilder.Property; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -118,7 +121,7 @@ class CreateReadwriteSplittingRuleExecutorTest { when(rule.getConfiguration()).thenReturn(createCurrentRuleConfiguration()); executor.setRule(rule); assertThrows(DuplicateReadwriteSplittingActualDataSourceException.class, - () -> executor.checkBeforeUpdate(createSQLStatement("readwrite_ds_1", "ds_write", Arrays.asList("read_ds_0", "read_ds_1"), "TEST"))); + () -> executor.checkBeforeUpdate(createSQLStatement("readwrite_ds_1", "ds_write", Arrays.asList("read_ds_0", "read_ds_1"), "TEST", new Properties()))); } @Test @@ -133,7 +136,14 @@ class CreateReadwriteSplittingRuleExecutorTest { when(rule.getConfiguration()).thenReturn(createCurrentRuleConfiguration()); executor.setRule(rule); assertThrows(DuplicateReadwriteSplittingActualDataSourceException.class, - () -> executor.checkBeforeUpdate(createSQLStatement("readwrite_ds_1", "write_ds_1", Arrays.asList("read_ds_0", "read_ds_1"), "TEST"))); + () -> executor.checkBeforeUpdate(createSQLStatement("readwrite_ds_1", "write_ds_1", Arrays.asList("read_ds_0", "read_ds_1"), "TEST", new Properties()))); + } + + @Test + void assertCheckSQLStatementWithInvalidLoadBalancerProperties() { + assertThrows(InvalidAlgorithmConfigurationException.class, + () -> executor.checkBeforeUpdate(createSQLStatement("readwrite_group", "write_ds", Arrays.asList("read_ds_0", "read_ds_1"), "weight", + PropertiesBuilder.build(new Property("read_ds_0", "5"), new Property("read_ds_2", "5"))))); } @Test @@ -172,8 +182,9 @@ class CreateReadwriteSplittingRuleExecutorTest { new AlgorithmSegment(loadBalancerName, new Properties()))); } - private CreateReadwriteSplittingRuleStatement createSQLStatement(final String ruleName, final String writeDataSource, final Collection<String> readDataSources, final String loadBalancerName) { - return createSQLStatement(false, new ReadwriteSplittingRuleSegment(ruleName, writeDataSource, readDataSources, new AlgorithmSegment(loadBalancerName, new Properties()))); + private CreateReadwriteSplittingRuleStatement createSQLStatement(final String ruleName, final String writeDataSource, final Collection<String> readDataSources, + final String loadBalancerName, final Properties props) { + return createSQLStatement(false, new ReadwriteSplittingRuleSegment(ruleName, writeDataSource, readDataSources, new AlgorithmSegment(loadBalancerName, props))); } private CreateReadwriteSplittingRuleStatement createSQLStatement(final boolean ifNotExists, final ReadwriteSplittingRuleSegment... ruleSegments) {
