This is an automated email from the ASF dual-hosted git repository.
jianglongtao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e81a05f Add TypedSingletonSPIHolder; Update
CreateShardingScalingRuleStatementUpdater (#14917)
e81a05f is described below
commit e81a05f1814b70dfeac4067f08183b61f878990b
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Jan 20 10:28:03 2022 +0800
Add TypedSingletonSPIHolder; Update
CreateShardingScalingRuleStatementUpdater (#14917)
* Add TypedSingletonSPIHolder
* Add unit test
---
.gitignore | 1 +
.../CreateShardingScalingRuleStatementUpdater.java | 31 ++++++-------
...ateShardingScalingRuleStatementUpdaterTest.java | 15 ++++---
.../spi/detect/JobCompletionDetectAlgorithm.java | 3 +-
.../spi/ingest/channel/PipelineChannelFactory.java | 3 +-
.../spi/ratelimit/JobRateLimitAlgorithm.java | 3 +-
.../spi/singleton/SingletonSPIRegistry.java | 3 ++
.../spi/singleton/TypedSingletonSPIHolder.java | 52 ++++++++++++++++++++++
.../spi/singleton/TypedSingletonSPIHolderTest.java | 41 +++++++++++++++++
9 files changed, 129 insertions(+), 23 deletions(-)
diff --git a/.gitignore b/.gitignore
index 143d572..336008c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -8,6 +8,7 @@ target/
*.tar.gz
dependency-reduced-pom.xml
.flattened-pom.xml
+pom.xml.versionsBackup
# maven plugin ignore
release.properties
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdater.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdater.java
index 4d6f114..243a065 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdater.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdater.java
@@ -33,27 +33,27 @@ import
org.apache.shardingsphere.scaling.distsql.handler.converter.ShardingScali
import
org.apache.shardingsphere.scaling.distsql.statement.CreateShardingScalingRuleStatement;
import
org.apache.shardingsphere.scaling.distsql.statement.segment.ShardingScalingRuleConfigurationSegment;
import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
-import org.apache.shardingsphere.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.spi.singleton.SingletonSPI;
+import org.apache.shardingsphere.spi.singleton.TypedSingletonSPIHolder;
import org.apache.shardingsphere.spi.typed.TypedSPI;
-import org.apache.shardingsphere.spi.typed.TypedSPIRegistry;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-import java.util.Properties;
/**
* Create sharding scaling rule statement updater.
*/
public final class CreateShardingScalingRuleStatementUpdater implements
RuleDefinitionCreateUpdater<CreateShardingScalingRuleStatement,
ShardingRuleConfiguration> {
- static {
- ShardingSphereServiceLoader.register(JobRateLimitAlgorithm.class);
- ShardingSphereServiceLoader.register(PipelineChannelFactory.class);
-
ShardingSphereServiceLoader.register(JobCompletionDetectAlgorithm.class);
-
ShardingSphereServiceLoader.register(DataConsistencyCheckAlgorithm.class);
- }
+ private static final TypedSingletonSPIHolder<JobRateLimitAlgorithm>
RATE_LIMIT_ALGORITHM_HOLDER = new
TypedSingletonSPIHolder<>(JobRateLimitAlgorithm.class, false);
+
+ private static final TypedSingletonSPIHolder<PipelineChannelFactory>
PIPELINE_CHANNEL_FACTORY_HOLDER = new
TypedSingletonSPIHolder<>(PipelineChannelFactory.class, false);
+
+ private static final TypedSingletonSPIHolder<JobCompletionDetectAlgorithm>
COMPLETION_DETECT_ALGORITHM_HOLDER = new
TypedSingletonSPIHolder<>(JobCompletionDetectAlgorithm.class, false);
+
+ private static final
TypedSingletonSPIHolder<DataConsistencyCheckAlgorithm>
DATA_CONSISTENCY_CHECK_ALGORITHM_HOLDER = new
TypedSingletonSPIHolder<>(DataConsistencyCheckAlgorithm.class, false);
@Override
public void checkSQLStatement(final ShardingSphereMetaData
shardingSphereMetaData, final CreateShardingScalingRuleStatement sqlStatement,
@@ -96,29 +96,30 @@ public final class
CreateShardingScalingRuleStatementUpdater implements RuleDefi
}
private void checkRateLimiterAlgorithm(final AlgorithmSegment rateLimiter)
throws DistSQLException {
- checkAlgorithm(JobRateLimitAlgorithm.class, "rate limiter",
rateLimiter);
+ checkAlgorithm(RATE_LIMIT_ALGORITHM_HOLDER, "rate limiter",
rateLimiter);
}
private void checkStreamChannelExist(final
ShardingScalingRuleConfigurationSegment segment) throws DistSQLException {
if (null != segment.getStreamChannel()) {
- checkAlgorithm(PipelineChannelFactory.class, "stream channel",
segment.getStreamChannel());
+ checkAlgorithm(PIPELINE_CHANNEL_FACTORY_HOLDER, "stream channel",
segment.getStreamChannel());
}
}
private void checkCompletionDetectorExist(final
ShardingScalingRuleConfigurationSegment segment) throws DistSQLException {
if (null != segment.getCompletionDetector()) {
- checkAlgorithm(JobCompletionDetectAlgorithm.class, "completion
detector", segment.getCompletionDetector());
+ checkAlgorithm(COMPLETION_DETECT_ALGORITHM_HOLDER, "completion
detector", segment.getCompletionDetector());
}
}
private void checkDataConsistencyCheckerExist(final
ShardingScalingRuleConfigurationSegment segment) throws DistSQLException {
if (null != segment.getDataConsistencyChecker()) {
- checkAlgorithm(DataConsistencyCheckAlgorithm.class, "data
consistency checker", segment.getDataConsistencyChecker());
+ checkAlgorithm(DATA_CONSISTENCY_CHECK_ALGORITHM_HOLDER, "data
consistency checker", segment.getDataConsistencyChecker());
}
}
- private <T extends TypedSPI> void checkAlgorithm(final Class<T>
typedSPIClass, final String algorithmType, final AlgorithmSegment segment)
throws DistSQLException {
- Optional<T> service =
TypedSPIRegistry.findRegisteredService(typedSPIClass, segment.getName(), new
Properties());
+ private <T extends TypedSPI & SingletonSPI> void checkAlgorithm(
+ final TypedSingletonSPIHolder<T> singletonSPIHolder, final String
algorithmType, final AlgorithmSegment segment) throws DistSQLException {
+ Optional<T> service = singletonSPIHolder.get(segment.getName());
if (!service.isPresent()) {
throw new InvalidAlgorithmConfigurationException(algorithmType,
segment.getName());
}
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdaterTest.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdaterTest.java
index 71fd909..2e3582b 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdaterTest.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/test/java/org/apache/shardingsphere/scaling/distsql/handler/CreateShardingScalingRuleStatementUpdaterTest.java
@@ -41,6 +41,8 @@ import static org.junit.Assert.assertThat;
@RunWith(MockitoJUnitRunner.class)
public final class CreateShardingScalingRuleStatementUpdaterTest {
+ private static final String QPS_TYPE = "qps";
+
@Mock
private ShardingSphereMetaData shardingSphereMetaData;
@@ -101,7 +103,7 @@ public final class
CreateShardingScalingRuleStatementUpdaterTest {
ShardingRuleConfiguration currentRuleConfig = new
ShardingRuleConfiguration();
CreateShardingScalingRuleStatement statement = new
CreateShardingScalingRuleStatement("default_scaling");
statement.setConfigurationSegment(createCompleteConfiguration());
- updater.checkSQLStatement(shardingSphereMetaData,
createSQLStatement("default_scaling"), currentRuleConfig);
+ updater.checkSQLStatement(shardingSphereMetaData, statement,
currentRuleConfig);
}
@Test
@@ -120,7 +122,7 @@ public final class
CreateShardingScalingRuleStatementUpdaterTest {
String key = result.getScaling().keySet().iterator().next();
assertThat(key, is("default_scaling"));
OnRuleAlteredActionConfiguration value = result.getScaling().get(key);
- assertThat(value.getInput().getRateLimiter().getType(), is("QPS"));
+ assertThat(value.getInput().getRateLimiter().getType(), is(QPS_TYPE));
assertThat(value.getOutput().getRateLimiter().getType(), is("TPS"));
assertThat(value.getStreamChannel().getType(), is("MEMORY"));
assertThat(value.getCompletionDetector().getType(), is("IDLE"));
@@ -139,11 +141,12 @@ public final class
CreateShardingScalingRuleStatementUpdaterTest {
String key = currentRuleConfig.getScaling().keySet().iterator().next();
assertThat(key, is("default_scaling"));
OnRuleAlteredActionConfiguration value =
currentRuleConfig.getScaling().get(key);
- assertThat(value.getInput().getRateLimiter().getType(), is("QPS"));
+ assertThat(value.getInput().getRateLimiter().getType(), is(QPS_TYPE));
assertThat(value.getOutput().getRateLimiter().getType(), is("TPS"));
assertThat(value.getStreamChannel().getType(), is("MEMORY"));
assertThat(value.getCompletionDetector().getType(), is("IDLE"));
assertThat(value.getDataConsistencyChecker().getType(),
is("DATA_MATCH"));
+
assertThat(value.getDataConsistencyChecker().getProps().getProperty("chunk-size"),
is("1000"));
}
private CreateShardingScalingRuleStatement createSQLStatement(final String
scalingName) {
@@ -181,11 +184,13 @@ public final class
CreateShardingScalingRuleStatementUpdaterTest {
private ShardingScalingRuleConfigurationSegment
createCompleteConfiguration() {
ShardingScalingRuleConfigurationSegment result = new
ShardingScalingRuleConfigurationSegment();
- result.setInputSegment(createInputOrOutputSegment("QPS"));
+ result.setInputSegment(createInputOrOutputSegment(QPS_TYPE));
result.setOutputSegment(createInputOrOutputSegment("TPS"));
result.setStreamChannel(createAlgorithmSegment("MEMORY"));
result.setCompletionDetector(createAlgorithmSegment("IDLE"));
- result.setDataConsistencyChecker(createAlgorithmSegment("DATA_MATCH"));
+ AlgorithmSegment dataConsistencyChecker =
createAlgorithmSegment("DATA_MATCH");
+ dataConsistencyChecker.getProps().setProperty("chunk-size", "1000");
+ result.setDataConsistencyChecker(dataConsistencyChecker);
return result;
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/detect/JobCompletionDetectAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/detect/JobCompletionDetectAlgorithm.java
index 86dd8ca..fa9d554 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/detect/JobCompletionDetectAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/detect/JobCompletionDetectAlgorithm.java
@@ -19,13 +19,14 @@ package org.apache.shardingsphere.data.pipeline.spi.detect;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmPostProcessor;
+import org.apache.shardingsphere.spi.singleton.SingletonSPI;
/**
* Job completion detect algorithm, SPI.
*
* @param <T> completion detect parameter type
*/
-public interface JobCompletionDetectAlgorithm<T> extends
ShardingSphereAlgorithm, ShardingSphereAlgorithmPostProcessor {
+public interface JobCompletionDetectAlgorithm<T> extends
ShardingSphereAlgorithm, ShardingSphereAlgorithmPostProcessor, SingletonSPI {
/**
* Whether job is almost completed.
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelFactory.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelFactory.java
index fafef82..eec3ec1 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelFactory.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ingest/channel/PipelineChannelFactory.java
@@ -21,11 +21,12 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.AckCallback;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmPostProcessor;
+import org.apache.shardingsphere.spi.singleton.SingletonSPI;
/**
* Pipeline channel factory, SPI.
*/
-public interface PipelineChannelFactory extends ShardingSphereAlgorithm,
ShardingSphereAlgorithmPostProcessor {
+public interface PipelineChannelFactory extends ShardingSphereAlgorithm,
ShardingSphereAlgorithmPostProcessor, SingletonSPI {
/**
* Create pipeline channel.
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
index d0df9a5..a6d035d 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-spi/src/main/java/org/apache/shardingsphere/data/pipeline/spi/ratelimit/JobRateLimitAlgorithm.java
@@ -20,11 +20,12 @@ package
org.apache.shardingsphere.data.pipeline.spi.ratelimit;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm;
import
org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithmPostProcessor;
+import org.apache.shardingsphere.spi.singleton.SingletonSPI;
/**
* Job rate limit algorithm, SPI.
*/
-public interface JobRateLimitAlgorithm extends ShardingSphereAlgorithm,
ShardingSphereAlgorithmPostProcessor {
+public interface JobRateLimitAlgorithm extends ShardingSphereAlgorithm,
ShardingSphereAlgorithmPostProcessor, SingletonSPI {
/**
* Intercept.
diff --git
a/shardingsphere-spi/src/main/java/org/apache/shardingsphere/spi/singleton/SingletonSPIRegistry.java
b/shardingsphere-spi/src/main/java/org/apache/shardingsphere/spi/singleton/SingletonSPIRegistry.java
index 1c7e085..b9bdc62 100644
---
a/shardingsphere-spi/src/main/java/org/apache/shardingsphere/spi/singleton/SingletonSPIRegistry.java
+++
b/shardingsphere-spi/src/main/java/org/apache/shardingsphere/spi/singleton/SingletonSPIRegistry.java
@@ -50,6 +50,9 @@ public final class SingletonSPIRegistry {
/**
* Get typed singleton instances map.
+ * <p>
+ * Notice: Map key is {@linkplain TypedSPI#getType()}, it won't be
converted to upper case or lower case. If type is case-insensitive, then try
{@linkplain TypedSingletonSPIHolder}.
+ * </p>
*
* @param singletonSPIClass singleton SPI class
* @param <T> the type of the input elements
diff --git
a/shardingsphere-spi/src/main/java/org/apache/shardingsphere/spi/singleton/TypedSingletonSPIHolder.java
b/shardingsphere-spi/src/main/java/org/apache/shardingsphere/spi/singleton/TypedSingletonSPIHolder.java
new file mode 100644
index 0000000..65fd650
--- /dev/null
+++
b/shardingsphere-spi/src/main/java/org/apache/shardingsphere/spi/singleton/TypedSingletonSPIHolder.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.spi.singleton;
+
+import org.apache.shardingsphere.spi.typed.TypedSPI;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Typed singleton SPI holder.
+ */
+public final class TypedSingletonSPIHolder<T extends TypedSPI & SingletonSPI> {
+
+ private final Map<String, T> singletonSPIMap;
+
+ private final boolean typeCaseSensitive;
+
+ public TypedSingletonSPIHolder(final Class<T> singletonSPIClass, final
boolean typeCaseSensitive) {
+ this.singletonSPIMap =
SingletonSPIRegistry.getSingletonInstancesMap(singletonSPIClass, t ->
getTypeKey(t.getType(), typeCaseSensitive));
+ this.typeCaseSensitive = typeCaseSensitive;
+ }
+
+ private String getTypeKey(final String type, final boolean
typeCaseSensitive) {
+ return typeCaseSensitive ? type : type.toUpperCase();
+ }
+
+ /**
+ * Get typed singleton SPI instance.
+ *
+ * @param type SPI type
+ * @return typed singleton SPI instance
+ */
+ public Optional<T> get(final String type) {
+ return Optional.ofNullable(singletonSPIMap.get(getTypeKey(type,
typeCaseSensitive)));
+ }
+}
diff --git
a/shardingsphere-spi/src/test/java/org/apache/shardingsphere/spi/singleton/TypedSingletonSPIHolderTest.java
b/shardingsphere-spi/src/test/java/org/apache/shardingsphere/spi/singleton/TypedSingletonSPIHolderTest.java
new file mode 100644
index 0000000..648d207
--- /dev/null
+++
b/shardingsphere-spi/src/test/java/org/apache/shardingsphere/spi/singleton/TypedSingletonSPIHolderTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.spi.singleton;
+
+import org.apache.shardingsphere.spi.fixture.singleton.SingletonSPIFixture;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public final class TypedSingletonSPIHolderTest {
+
+ @Test
+ public void assertGetOnTypeCaseSensitiveTrue() {
+ TypedSingletonSPIHolder<SingletonSPIFixture> spiHolder = new
TypedSingletonSPIHolder<>(SingletonSPIFixture.class, true);
+ assertThat(spiHolder.get("SINGLETON_FIXTURE").isPresent(), is(true));
+ assertThat(spiHolder.get("singleton_fixture").isPresent(), is(false));
+ }
+
+ @Test
+ public void assertGetOnTypeCaseSensitiveFalse() {
+ TypedSingletonSPIHolder<SingletonSPIFixture> spiHolder = new
TypedSingletonSPIHolder<>(SingletonSPIFixture.class, false);
+ assertThat(spiHolder.get("SINGLETON_FIXTURE").isPresent(), is(true));
+ assertThat(spiHolder.get("singleton_fixture").isPresent(), is(true));
+ }
+}