This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1299783  Refactor CreateReadwriteSplittingRuleBackendHandler (#10338)
1299783 is described below

commit 12997837cb4a405778cfe19358bdf72018ceaa8a
Author: Haoran Meng <[email protected]>
AuthorDate: Fri May 14 20:56:01 2021 +0800

    Refactor CreateReadwriteSplittingRuleBackendHandler (#10338)
    
    Co-authored-by: menghaoranss <[email protected]>
---
 .../db/protocol/error/CommonErrorCode.java         |   6 +-
 .../src/main/antlr4/imports/RDLStatement.g4        |   8 --
 .../distsql/parser/autogen/DistSQLStatement.g4     |   1 -
 .../distsql/parser/core/DistSQLVisitor.java        |  34 +----
 ...teReadwriteSplittingRuleStatementConverter.java |  39 ++++--
 ...ion.java => InvalidLoadBalancersException.java} |  16 ++-
 ...eadwriteSplittingRuleCreateExistsException.java |   2 +
 ...CreateReadwriteSplittingRuleBackendHandler.java |  40 +++++-
 .../fixture/TestReplicaLoadBalanceAlgorithm.java}  |  26 +++-
 ...teReadwriteSplittingRuleBackendHandlerTest.java | 138 +++++++++++++++++++++
 ...dwritesplitting.spi.ReplicaLoadBalanceAlgorithm |  18 +++
 .../frontend/mysql/err/MySQLErrPacketFactory.java  |   6 +-
 12 files changed, 262 insertions(+), 72 deletions(-)

diff --git 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-core/src/main/java/org/apache/shardingsphere/db/protocol/error/CommonErrorCode.java
 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-core/src/main/java/org/apache/shardingsphere/db/protocol/error/CommonErrorCode.java
index ea48a7a..59d6124 100644
--- 
a/shardingsphere-db-protocol/shardingsphere-db-protocol-core/src/main/java/org/apache/shardingsphere/db/protocol/error/CommonErrorCode.java
+++ 
b/shardingsphere-db-protocol/shardingsphere-db-protocol-core/src/main/java/org/apache/shardingsphere/db/protocol/error/CommonErrorCode.java
@@ -42,8 +42,8 @@ public enum CommonErrorCode implements SQLErrorCode {
     REPLICA_QUERY_RULE_DATA_SOURCE_NOT_EXIST(1106, "C1106", "Data sources %s 
in replica query rule do not exist."),
     
     ADD_REPLICA_QUERY_RULE_DATA_SOURCE_EXIST(1107, "C1107", "Data sources %s 
in replica query rule already exists."),
-    
-    REPLICA_QUERY_RULE_EXIST(1108, "C1108", "Replica query rule already 
exists."),
+
+    READWRITE_SPLITTING_RULE_EXIST(1108, "C1108", "Readwrite splitting rule 
already exists in schema %s."),
     
     SHARDING_RULE_NOT_EXIST(1109, "C1109", "Sharding rule does not exist."),
     
@@ -61,6 +61,8 @@ public enum CommonErrorCode implements SQLErrorCode {
 
     SHARDING_BROADCAST_TABLE_RULES_NOT_EXIST(1116, "C1116", "Sharding 
broadcast table rules not exist in schema %s."),
 
+    INVALID_LOAD_BALANCERS(1117, "C1117", "Invalid load balancers %s."),
+
     SCALING_JOB_NOT_EXIST(1201, "C1201", "Scaling job %s does not exist."),
     
     SCALING_OPERATE_FAILED(1209, "C1209", "Scaling Operate Failed: [%s]"),
diff --git 
a/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/main/antlr4/imports/RDLStatement.g4
 
b/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/main/antlr4/imports/RDLStatement.g4
index aa85a4c..fa304e2 100644
--- 
a/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/main/antlr4/imports/RDLStatement.g4
+++ 
b/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/main/antlr4/imports/RDLStatement.g4
@@ -103,14 +103,6 @@ dynamicReadwriteSplittingRuleDefinition
     : AUTO_AWARE_RESOURCE EQ IDENTIFIER
     ;
 
-createReplicaQueryRule
-    : CREATE REPLICA_QUERY RULE LP replicaQueryRuleDefinition (COMMA 
replicaQueryRuleDefinition)* RP
-    ;
-
-replicaQueryRuleDefinition
-    : ruleName LP PRIMARY EQ primary=schemaName COMMA REPLICA EQ schemaNames 
RP functionDefinition
-    ;
-
 alterReplicaQueryRule
     : ALTER REPLICA_QUERY RULE LP alterReplicaQueryRuleDefinition (COMMA 
alterReplicaQueryRuleDefinition)* RP
     ;
diff --git 
a/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/main/antlr4/org/apache/shardingsphere/distsql/parser/autogen/DistSQLStatement.g4
 
b/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/main/antlr4/org/apache/shardingsphere/distsql/parser/autogen/DistSQLStatement.g4
index b328604..321e181 100644
--- 
a/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/main/antlr4/org/apache/shardingsphere/distsql/parser/autogen/DistSQLStatement.g4
+++ 
b/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/main/antlr4/org/apache/shardingsphere/distsql/parser/autogen/DistSQLStatement.g4
@@ -32,7 +32,6 @@ execute
     | dropShardingTableRule
     | dropShardingBindingTableRules
     | dropShardingBroadcastTableRules
-    | createReplicaQueryRule
     | alterReplicaQueryRule
     | dropReplicaQueryRule
     | showResources
diff --git 
a/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/DistSQLVisitor.java
 
b/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/DistSQLVisitor.java
index 89f1ea9..f8b14cb 100644
--- 
a/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/DistSQLVisitor.java
+++ 
b/shardingsphere-distsql-parser/shardingsphere-distsql-parser-engine/src/main/java/org/apache/shardingsphere/distsql/parser/core/DistSQLVisitor.java
@@ -28,7 +28,6 @@ import 
org.apache.shardingsphere.distsql.parser.autogen.DistSQLStatementParser.A
 import 
org.apache.shardingsphere.distsql.parser.autogen.DistSQLStatementParser.AlterReplicaQueryRuleDefinitionContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.DistSQLStatementParser.BindTableRulesDefinitionContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.DistSQLStatementParser.CheckScalingJobContext;
-import 
org.apache.shardingsphere.distsql.parser.autogen.DistSQLStatementParser.CreateReplicaQueryRuleContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.DistSQLStatementParser.CreateShardingBindingTableRulesContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.DistSQLStatementParser.CreateShardingBroadcastTableRulesContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.DistSQLStatementParser.CreateShardingTableRuleContext;
@@ -37,7 +36,6 @@ import 
org.apache.shardingsphere.distsql.parser.autogen.DistSQLStatementParser.D
 import 
org.apache.shardingsphere.distsql.parser.autogen.DistSQLStatementParser.DropResourceContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.DistSQLStatementParser.DropScalingJobContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.DistSQLStatementParser.FunctionDefinitionContext;
-import 
org.apache.shardingsphere.distsql.parser.autogen.DistSQLStatementParser.ReplicaQueryRuleDefinitionContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.DistSQLStatementParser.ResetScalingJobContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.DistSQLStatementParser.SchemaNameContext;
 import 
org.apache.shardingsphere.distsql.parser.autogen.DistSQLStatementParser.ShardingTableRuleDefinitionContext;
@@ -142,16 +140,7 @@ public final class DistSQLVisitor extends 
DistSQLStatementBaseVisitor<ASTNode> {
         }
         return result;
     }
-    
-    @Override
-    public ASTNode visitCreateReplicaQueryRule(final 
CreateReplicaQueryRuleContext ctx) {
-        Collection<ReadwriteSplittingRuleSegment> replicaQueryRules = new 
LinkedList<>();
-        for (ReplicaQueryRuleDefinitionContext each : 
ctx.replicaQueryRuleDefinition()) {
-            replicaQueryRules.add((ReadwriteSplittingRuleSegment) visit(each));
-        }
-        return new CreateReadwriteSplittingRuleStatement(replicaQueryRules);
-    }
-    
+
     @Override
     public ASTNode visitCreateShardingBroadcastTableRules(final 
CreateShardingBroadcastTableRulesContext ctx) {
         CreateShardingBroadcastTableRulesStatement result = new 
CreateShardingBroadcastTableRulesStatement();
@@ -223,27 +212,6 @@ public final class DistSQLVisitor extends 
DistSQLStatementBaseVisitor<ASTNode> {
         result.setAutoAwareResource(ctx.IDENTIFIER().getText());
         return result;
     }
-
-    @Override
-    public ASTNode visitReplicaQueryRuleDefinition(final 
ReplicaQueryRuleDefinitionContext ctx) {
-        ReadwriteSplittingRuleSegment result = new 
ReadwriteSplittingRuleSegment();
-        Collection<String> replicaDatasources = new LinkedList<>();
-        for (SchemaNameContext each : ctx.schemaNames().schemaName()) {
-            replicaDatasources.add(each.getText());
-        }
-        Properties props = new Properties();
-        if (null != ctx.functionDefinition().algorithmProperties()) {
-            for (AlgorithmPropertyContext each : 
ctx.functionDefinition().algorithmProperties().algorithmProperty()) {
-                props.setProperty(each.key.getText(), each.value.getText());
-            }
-        }
-        result.setName(ctx.ruleName().getText());
-        result.setWriteDataSource(ctx.primary.getText());
-        result.setReadDataSources(replicaDatasources);
-        
result.setLoadBalancer(ctx.functionDefinition().functionName().getText());
-        result.setProps(props);
-        return result;
-    }
     
     @Override
     public ASTNode visitAlterReplicaQueryRule(final 
AlterReplicaQueryRuleContext ctx) {
diff --git 
a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-common/src/main/java/org/apache/shardingsphere/readwritesplitting/common/yaml/converter/CreateReadwriteSplittingRuleStatementConverter.java
 
b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-common/src/main/java/org/apache/shardingsphere/readwritesplitting/common/yaml/converter/CreateReadwriteSplittingRuleStatementConverter.java
index c9589b9..cfeee93 100644
--- 
a/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-common/src/main/java/org/apache/shardingsphere/readwritesplitting/common/yaml/converter/CreateReadwriteSplittingRuleStatementConverter.java
+++ 
b/shardingsphere-features/shardingsphere-readwrite-splitting/shardingsphere-readwrite-splitting-common/src/main/java/org/apache/shardingsphere/readwritesplitting/common/yaml/converter/CreateReadwriteSplittingRuleStatementConverter.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.readwritesplitting.common.yaml.converter;
 
+import com.google.common.base.Strings;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import 
org.apache.shardingsphere.readwritesplitting.common.yaml.config.YamlReadwriteSplittingRuleConfiguration;
@@ -40,17 +41,35 @@ public final class 
CreateReadwriteSplittingRuleStatementConverter {
     public static YamlReadwriteSplittingRuleConfiguration convert(final 
CreateReadwriteSplittingRuleStatement sqlStatement) {
         YamlReadwriteSplittingRuleConfiguration result = new 
YamlReadwriteSplittingRuleConfiguration();
         for (ReadwriteSplittingRuleSegment each : 
sqlStatement.getReadwriteSplittingRules()) {
-            YamlReadwriteSplittingDataSourceRuleConfiguration 
dataSourceRuleConfiguration = new 
YamlReadwriteSplittingDataSourceRuleConfiguration();
-            
dataSourceRuleConfiguration.setWriteDataSourceName(each.getWriteDataSource());
-            
dataSourceRuleConfiguration.getReadDataSourceNames().addAll(each.getReadDataSources());
-            
dataSourceRuleConfiguration.setLoadBalancerName(each.getLoadBalancer());
-            dataSourceRuleConfiguration.setProps(each.getProps());
-            result.getDataSources().put(each.getName(), 
dataSourceRuleConfiguration);
-            YamlShardingSphereAlgorithmConfiguration loadBalancer = new 
YamlShardingSphereAlgorithmConfiguration();
-            loadBalancer.setType(each.getLoadBalancer());
-            loadBalancer.setProps(each.getProps());
-            result.getLoadBalancers().put(each.getLoadBalancer(), 
loadBalancer);
+            String loadBalancerName = getLoadBalancerName(each.getName(), 
each.getLoadBalancer());
+            result.getDataSources().put(each.getName(), 
buildDataSourceRuleConfiguration(loadBalancerName, each));
+            result.getLoadBalancers().put(loadBalancerName, 
buildLoadBalancer(each));
         }
         return result;
     }
+
+    private static YamlReadwriteSplittingDataSourceRuleConfiguration 
buildDataSourceRuleConfiguration(final String loadBalancerName,
+                                                                               
                       final ReadwriteSplittingRuleSegment 
readwriteSplittingRuleSegment) {
+        YamlReadwriteSplittingDataSourceRuleConfiguration result = new 
YamlReadwriteSplittingDataSourceRuleConfiguration();
+        if 
(Strings.isNullOrEmpty(readwriteSplittingRuleSegment.getAutoAwareResource())) {
+            
result.setWriteDataSourceName(readwriteSplittingRuleSegment.getWriteDataSource());
+            
result.getReadDataSourceNames().addAll(readwriteSplittingRuleSegment.getReadDataSources());
+        } else {
+            
result.setAutoAwareDataSourceName(readwriteSplittingRuleSegment.getAutoAwareResource());
+        }
+        result.setLoadBalancerName(loadBalancerName);
+        result.setProps(readwriteSplittingRuleSegment.getProps());
+        return result;
+    }
+
+    private static YamlShardingSphereAlgorithmConfiguration 
buildLoadBalancer(final ReadwriteSplittingRuleSegment 
readwriteSplittingRuleSegment) {
+        YamlShardingSphereAlgorithmConfiguration result = new 
YamlShardingSphereAlgorithmConfiguration();
+        result.setType(readwriteSplittingRuleSegment.getLoadBalancer());
+        result.setProps(readwriteSplittingRuleSegment.getProps());
+        return result;
+    }
+
+    private static String getLoadBalancerName(final String ruleName, final 
String loadBalancerType) {
+        return String.format("%s_%s", ruleName, loadBalancerType);
+    }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ReadwriteSplittingRuleCreateExistsException.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/InvalidLoadBalancersException.java
similarity index 73%
copy from 
shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ReadwriteSplittingRuleCreateExistsException.java
copy to 
shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/InvalidLoadBalancersException.java
index 5ef819b..6e30b26 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ReadwriteSplittingRuleCreateExistsException.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/InvalidLoadBalancersException.java
@@ -17,15 +17,19 @@
 
 package org.apache.shardingsphere.proxy.backend.exception;
 
+import lombok.AllArgsConstructor;
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+
+import java.util.Collection;
 
 /**
- * Readwrite-splitting rule create exists exception.
+ * Invalid load balancers exception.
  */
-@RequiredArgsConstructor
+@AllArgsConstructor
 @Getter
-public final class ReadwriteSplittingRuleCreateExistsException extends 
BackendException {
-    
-    private static final long serialVersionUID = -6902287715467426449L;
+public final class InvalidLoadBalancersException extends BackendException {
+
+    private static final long serialVersionUID = -8028595443792872970L;
+
+    private final Collection<String> loadBalancers;
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ReadwriteSplittingRuleCreateExistsException.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ReadwriteSplittingRuleCreateExistsException.java
index 5ef819b..5b03bd6 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ReadwriteSplittingRuleCreateExistsException.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ReadwriteSplittingRuleCreateExistsException.java
@@ -28,4 +28,6 @@ import lombok.RequiredArgsConstructor;
 public final class ReadwriteSplittingRuleCreateExistsException extends 
BackendException {
     
     private static final long serialVersionUID = -6902287715467426449L;
+
+    private final String schemaName;
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateReadwriteSplittingRuleBackendHandler.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateReadwriteSplittingRuleBackendHandler.java
index c4c0a95..f1429df 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateReadwriteSplittingRuleBackendHandler.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateReadwriteSplittingRuleBackendHandler.java
@@ -17,26 +17,35 @@
 
 package org.apache.shardingsphere.proxy.backend.text.distsql.rdl.impl;
 
+import com.google.common.base.Strings;
 import 
org.apache.shardingsphere.distsql.parser.statement.rdl.create.impl.CreateReadwriteSplittingRuleStatement;
 import 
org.apache.shardingsphere.governance.core.registry.listener.event.rule.RuleConfigurationsAlteredEvent;
 import org.apache.shardingsphere.infra.config.RuleConfiguration;
 import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.spi.typed.TypedSPIRegistry;
 import 
org.apache.shardingsphere.infra.yaml.swapper.YamlRuleConfigurationSwapperEngine;
 import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import 
org.apache.shardingsphere.proxy.backend.exception.InvalidLoadBalancersException;
 import 
org.apache.shardingsphere.proxy.backend.exception.ReadwriteSplittingRuleCreateExistsException;
+import 
org.apache.shardingsphere.proxy.backend.exception.ResourceNotExistedException;
 import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
 import 
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
 import 
org.apache.shardingsphere.proxy.backend.text.SchemaRequiredBackendHandler;
 import 
org.apache.shardingsphere.readwritesplitting.api.ReadwriteSplittingRuleConfiguration;
 import 
org.apache.shardingsphere.readwritesplitting.common.yaml.config.YamlReadwriteSplittingRuleConfiguration;
 import 
org.apache.shardingsphere.readwritesplitting.common.yaml.converter.CreateReadwriteSplittingRuleStatementConverter;
+import 
org.apache.shardingsphere.readwritesplitting.spi.ReplicaLoadBalanceAlgorithm;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.stream.Collectors;
 
 /**
- * Create readwrite-splitting rule backend handler.
+ * Create readwrite splitting rule backend handler.
  */
 public final class CreateReadwriteSplittingRuleBackendHandler extends 
SchemaRequiredBackendHandler<CreateReadwriteSplittingRuleStatement> {
     
@@ -46,19 +55,40 @@ public final class 
CreateReadwriteSplittingRuleBackendHandler extends SchemaRequ
     
     @Override
     public ResponseHeader execute(final String schemaName, final 
CreateReadwriteSplittingRuleStatement sqlStatement) {
-        check(schemaName);
+        check(schemaName, sqlStatement);
         YamlReadwriteSplittingRuleConfiguration config = 
CreateReadwriteSplittingRuleStatementConverter.convert(sqlStatement);
         Collection<RuleConfiguration> rules = new 
YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(Collections.singleton(config));
         post(schemaName, rules);
         return new UpdateResponseHeader(sqlStatement);
     }
     
-    private void check(final String schemaName) {
+    private void check(final String schemaName, final 
CreateReadwriteSplittingRuleStatement sqlStatement) {
         if 
(ProxyContext.getInstance().getMetaData(schemaName).getRuleMetaData().getConfigurations().stream().anyMatch(each
 -> each instanceof ReadwriteSplittingRuleConfiguration)) {
-            throw new ReadwriteSplittingRuleCreateExistsException();
+            throw new ReadwriteSplittingRuleCreateExistsException(schemaName);
+        }
+        Collection<String> resources = new LinkedHashSet<>();
+        sqlStatement.getReadwriteSplittingRules().stream().filter(each -> 
Strings.isNullOrEmpty(each.getAutoAwareResource())).forEach(each -> {
+            resources.add(each.getWriteDataSource());
+            resources.addAll(each.getReadDataSources());
+        });
+
+        Collection<String> notExistResources = resources.stream().filter(each 
-> !this.isValidResource(schemaName, each)).collect(Collectors.toList());
+        if (!notExistResources.isEmpty()) {
+            throw new ResourceNotExistedException(notExistResources);
+        }
+        Collection<String> invalidLoadBalances = 
sqlStatement.getReadwriteSplittingRules().stream().map(each -> 
each.getLoadBalancer()).distinct()
+                .filter(each -> 
!TypedSPIRegistry.findRegisteredService(ReplicaLoadBalanceAlgorithm.class, 
each, new Properties()).isPresent())
+                .collect(Collectors.toList());
+        if (!invalidLoadBalances.isEmpty()) {
+            throw new InvalidLoadBalancersException(invalidLoadBalances);
         }
     }
-    
+
+    private boolean isValidResource(final String schemaName, final String 
resourceName) {
+        return 
Objects.nonNull(ProxyContext.getInstance().getMetaData(schemaName).getResource())
+                && 
ProxyContext.getInstance().getMetaData(schemaName).getResource().getDataSources().containsKey(resourceName);
+    }
+
     private void post(final String schemaName, final 
Collection<RuleConfiguration> rules) {
         ShardingSphereEventBus.getInstance().post(new 
RuleConfigurationsAlteredEvent(schemaName, rules));
     }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ReadwriteSplittingRuleCreateExistsException.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/fixture/TestReplicaLoadBalanceAlgorithm.java
similarity index 55%
copy from 
shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ReadwriteSplittingRuleCreateExistsException.java
copy to 
shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/fixture/TestReplicaLoadBalanceAlgorithm.java
index 5ef819b..79217b3 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/exception/ReadwriteSplittingRuleCreateExistsException.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/fixture/TestReplicaLoadBalanceAlgorithm.java
@@ -15,17 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.proxy.backend.exception;
+package org.apache.shardingsphere.proxy.backend.text.distsql.fixture;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import 
org.apache.shardingsphere.readwritesplitting.spi.ReplicaLoadBalanceAlgorithm;
+
+import java.util.List;
+import java.util.Properties;
 
 /**
- * Readwrite-splitting rule create exists exception.
+ * Test replica load balance algorithm.
  */
-@RequiredArgsConstructor
 @Getter
-public final class ReadwriteSplittingRuleCreateExistsException extends 
BackendException {
+@Setter
+public final class TestReplicaLoadBalanceAlgorithm implements 
ReplicaLoadBalanceAlgorithm {
+    
+    private Properties props = new Properties();
+    
+    @Override
+    public String getDataSource(final String name, final String 
writeDataSourceName, final List<String> readDataSourceNames) {
+        return null;
+    }
     
-    private static final long serialVersionUID = -6902287715467426449L;
+    @Override
+    public String getType() {
+        return "TEST";
+    }
 }
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateReadwriteSplittingRuleBackendHandlerTest.java
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateReadwriteSplittingRuleBackendHandlerTest.java
new file mode 100644
index 0000000..4275b5c
--- /dev/null
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/text/distsql/rdl/impl/CreateReadwriteSplittingRuleBackendHandlerTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.backend.text.distsql.rdl.impl;
+
+import com.google.common.collect.Maps;
+import 
org.apache.shardingsphere.distsql.parser.segment.rdl.ReadwriteSplittingRuleSegment;
+import 
org.apache.shardingsphere.distsql.parser.statement.rdl.create.impl.CreateReadwriteSplittingRuleStatement;
+import org.apache.shardingsphere.infra.context.metadata.MetaDataContexts;
+import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
+import 
org.apache.shardingsphere.infra.metadata.resource.ShardingSphereResource;
+import 
org.apache.shardingsphere.infra.metadata.rule.ShardingSphereRuleMetaData;
+import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
+import 
org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
+import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
+import 
org.apache.shardingsphere.proxy.backend.exception.InvalidLoadBalancersException;
+import 
org.apache.shardingsphere.proxy.backend.exception.ReadwriteSplittingRuleCreateExistsException;
+import 
org.apache.shardingsphere.proxy.backend.exception.ResourceNotExistedException;
+import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
+import 
org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
+import 
org.apache.shardingsphere.readwritesplitting.api.ReadwriteSplittingRuleConfiguration;
+import 
org.apache.shardingsphere.readwritesplitting.spi.ReplicaLoadBalanceAlgorithm;
+import org.apache.shardingsphere.transaction.context.TransactionContexts;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import javax.sql.DataSource;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public final class CreateReadwriteSplittingRuleBackendHandlerTest {
+    
+    @Mock
+    private BackendConnection backendConnection;
+    
+    @Mock
+    private CreateReadwriteSplittingRuleStatement sqlStatement;
+    
+    @Mock
+    private MetaDataContexts metaDataContexts;
+    
+    @Mock
+    private TransactionContexts transactionContexts;
+    
+    @Mock
+    private ShardingSphereMetaData shardingSphereMetaData;
+    
+    @Mock
+    private ShardingSphereRuleMetaData ruleMetaData;
+
+    @Mock
+    private ShardingSphereResource shardingSphereResource;
+    
+    private CreateReadwriteSplittingRuleBackendHandler handler = new 
CreateReadwriteSplittingRuleBackendHandler(sqlStatement, backendConnection);
+    
+    @Before
+    public void setUp() {
+        
ShardingSphereServiceLoader.register(ReplicaLoadBalanceAlgorithm.class);
+        ProxyContext.getInstance().init(metaDataContexts, transactionContexts);
+        
when(metaDataContexts.getAllSchemaNames()).thenReturn(Collections.singletonList("test"));
+        
when(metaDataContexts.getMetaData(eq("test"))).thenReturn(shardingSphereMetaData);
+        
when(shardingSphereMetaData.getRuleMetaData()).thenReturn(ruleMetaData);
+    }
+    
+    @Test
+    public void assertExecute() {
+        ReadwriteSplittingRuleSegment readwriteSplittingRuleSegment = new 
ReadwriteSplittingRuleSegment();
+        readwriteSplittingRuleSegment.setName("pr_ds");
+        readwriteSplittingRuleSegment.setWriteDataSource("ds_write");
+        
readwriteSplittingRuleSegment.setReadDataSources(Arrays.asList("ds_read_0", 
"ds_read_1"));
+        readwriteSplittingRuleSegment.setLoadBalancer("TEST");
+        
when(sqlStatement.getReadwriteSplittingRules()).thenReturn(Collections.singletonList(readwriteSplittingRuleSegment));
+        
when(shardingSphereMetaData.getResource()).thenReturn(shardingSphereResource);
+        Map<String, DataSource> dataSourceMap = mock(Map.class);
+        
when(shardingSphereResource.getDataSources()).thenReturn(dataSourceMap);
+        when(dataSourceMap.containsKey(anyString())).thenReturn(true);
+        ResponseHeader responseHeader = handler.execute("test", sqlStatement);
+        assertNotNull(responseHeader);
+        assertTrue(responseHeader instanceof UpdateResponseHeader);
+    }
+    
+    @Test(expected = ReadwriteSplittingRuleCreateExistsException.class)
+    public void assertExecuteWithExistReadwriteSplittingRule() {
+        
when(ruleMetaData.getConfigurations()).thenReturn(Collections.singletonList(new 
ReadwriteSplittingRuleConfiguration(Collections.emptyList(), 
Maps.newHashMap())));
+        handler.execute("test", sqlStatement);
+    }
+    
+    @Test(expected = ResourceNotExistedException.class)
+    public void assertExecuteWithNotExistResources() {
+        ReadwriteSplittingRuleSegment readwriteSplittingRuleSegment = new 
ReadwriteSplittingRuleSegment();
+        readwriteSplittingRuleSegment.setName("pr_ds");
+        readwriteSplittingRuleSegment.setWriteDataSource("ds_write");
+        
readwriteSplittingRuleSegment.setReadDataSources(Arrays.asList("ds_read_0", 
"ds_read_1"));
+        
when(sqlStatement.getReadwriteSplittingRules()).thenReturn(Collections.singletonList(readwriteSplittingRuleSegment));
+        handler.execute("test", sqlStatement);
+    }
+
+    @Test(expected = InvalidLoadBalancersException.class)
+    public void assertExecuteWithInvalidLoadBalancer() {
+        ReadwriteSplittingRuleSegment readwriteSplittingRuleSegment = new 
ReadwriteSplittingRuleSegment();
+        readwriteSplittingRuleSegment.setName("pr_ds");
+        readwriteSplittingRuleSegment.setWriteDataSource("ds_write");
+        
readwriteSplittingRuleSegment.setReadDataSources(Arrays.asList("ds_read_0", 
"ds_read_1"));
+        readwriteSplittingRuleSegment.setLoadBalancer("notExistLoadBalancer");
+        
when(sqlStatement.getReadwriteSplittingRules()).thenReturn(Collections.singletonList(readwriteSplittingRuleSegment));
+        
when(shardingSphereMetaData.getResource()).thenReturn(shardingSphereResource);
+        Map<String, DataSource> dataSourceMap = mock(Map.class);
+        
when(shardingSphereResource.getDataSources()).thenReturn(dataSourceMap);
+        when(dataSourceMap.containsKey(anyString())).thenReturn(true);
+        handler.execute("test", sqlStatement);
+    }
+}
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/resources/META-INF/services/org.apache.shardingsphere.readwritesplitting.spi.ReplicaLoadBalanceAlgorithm
 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/resources/META-INF/services/org.apache.shardingsphere.readwritesplitting.spi.ReplicaLoadBalanceAlgorithm
new file mode 100644
index 0000000..83a536f
--- /dev/null
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/resources/META-INF/services/org.apache.shardingsphere.readwritesplitting.spi.ReplicaLoadBalanceAlgorithm
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.proxy.backend.text.distsql.fixture.TestReplicaLoadBalanceAlgorithm
diff --git 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/err/MySQLErrPacketFactory.java
 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/err/MySQLErrPacketFactory.java
index 169f128..2bed806 100644
--- 
a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/err/MySQLErrPacketFactory.java
+++ 
b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-mysql/src/main/java/org/apache/shardingsphere/proxy/frontend/mysql/err/MySQLErrPacketFactory.java
@@ -29,6 +29,7 @@ import 
org.apache.shardingsphere.proxy.backend.exception.DBCreateExistsException
 import org.apache.shardingsphere.proxy.backend.exception.DBDropExistsException;
 import 
org.apache.shardingsphere.proxy.backend.exception.DuplicateResourceException;
 import 
org.apache.shardingsphere.proxy.backend.exception.DuplicateTablesException;
+import 
org.apache.shardingsphere.proxy.backend.exception.InvalidLoadBalancersException;
 import 
org.apache.shardingsphere.proxy.backend.exception.InvalidResourceException;
 import 
org.apache.shardingsphere.proxy.backend.exception.NoDatabaseSelectedException;
 import 
org.apache.shardingsphere.proxy.backend.exception.ReadwriteSplittingRuleCreateExistsException;
@@ -173,11 +174,14 @@ public final class MySQLErrPacketFactory {
             return new MySQLErrPacket(1, 
CommonErrorCode.ADD_REPLICA_QUERY_RULE_DATA_SOURCE_EXIST, 
((AddReadwriteSplittingRuleDataSourcesExistedException) cause).getRuleNames());
         }
         if (cause instanceof ReadwriteSplittingRuleCreateExistsException) {
-            return new MySQLErrPacket(1, 
CommonErrorCode.REPLICA_QUERY_RULE_EXIST);
+            return new MySQLErrPacket(1, 
CommonErrorCode.READWRITE_SPLITTING_RULE_EXIST, 
((ReadwriteSplittingRuleCreateExistsException) cause).getSchemaName());
         }
         if (cause instanceof ScalingJobNotFoundException) {
             return new MySQLErrPacket(1, 
CommonErrorCode.SCALING_JOB_NOT_EXIST, ((ScalingJobNotFoundException) 
cause).getJobId());
         }
+        if (cause instanceof InvalidLoadBalancersException) {
+            return new MySQLErrPacket(1, 
CommonErrorCode.INVALID_LOAD_BALANCERS, ((InvalidLoadBalancersException) 
cause).getLoadBalancers());
+        }
         return new MySQLErrPacket(1, CommonErrorCode.UNKNOWN_EXCEPTION, 
cause.getMessage());
     }
 }

Reply via email to