This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a76d0d70f52 Add DataConsistencyCheckAlgorithmInfoRegistry (#29084)
a76d0d70f52 is described below
commit a76d0d70f522e5ab2a803d7f85a763e9e9f35c5f
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Nov 19 15:43:31 2023 +0800
Add DataConsistencyCheckAlgorithmInfoRegistry (#29084)
---
.../DataConsistencyCheckAlgorithmInfoRegistry.java | 61 ++++++++++++++++++++++
.../service/InventoryIncrementalJobManager.java | 32 +-----------
.../ShowMigrationCheckAlgorithmsExecutor.java | 8 +--
3 files changed, 65 insertions(+), 36 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/DataConsistencyCheckAlgorithmInfoRegistry.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/DataConsistencyCheckAlgorithmInfoRegistry.java
new file mode 100644
index 00000000000..efde834078d
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/pojo/DataConsistencyCheckAlgorithmInfoRegistry.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.common.pojo;
+
+import lombok.NoArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
+import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.stream.Collectors;
+
+/**
+ * Data consistency check algorithm info registry.
+ */
+@NoArgsConstructor
+public final class DataConsistencyCheckAlgorithmInfoRegistry {
+
+ private static final Collection<DataConsistencyCheckAlgorithmInfo>
ALGORITHM_INFOS = loadAllAlgorithms();
+
+ private static Collection<DataConsistencyCheckAlgorithmInfo>
loadAllAlgorithms() {
+ Collection<DataConsistencyCheckAlgorithmInfo> result = new
LinkedList<>();
+ for (TableDataConsistencyChecker each :
ShardingSphereServiceLoader.getServiceInstances(TableDataConsistencyChecker.class))
{
+ SPIDescription description =
each.getClass().getAnnotation(SPIDescription.class);
+ String typeAliases =
each.getTypeAliases().stream().map(Object::toString).collect(Collectors.joining(","));
+ result.add(
+ new DataConsistencyCheckAlgorithmInfo(each.getType(),
typeAliases, getSupportedDatabaseTypes(each.getSupportedDatabaseTypes()), null
== description ? "" : description.value()));
+ }
+ return result;
+ }
+
+ private static Collection<DatabaseType> getSupportedDatabaseTypes(final
Collection<DatabaseType> supportedDatabaseTypes) {
+ return supportedDatabaseTypes.isEmpty() ?
ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class) :
supportedDatabaseTypes;
+ }
+
+ /**
+ * Get all data consistency check algorithm infos.
+ *
+ * @return all data consistency check algorithm infos
+ */
+ public static Collection<DataConsistencyCheckAlgorithmInfo>
getAllAlgorithmInfos() {
+ return ALGORITHM_INFOS;
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
index 764866c717e..2ef78b7e8bf 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/InventoryIncrementalJobManager.java
@@ -28,27 +28,20 @@ import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncr
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfo;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlJobOffsetInfoSwapper;
-import
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfo;
import
org.apache.shardingsphere.data.pipeline.common.pojo.InventoryIncrementalJobItemInfo;
import
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
-import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
-import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
@@ -89,11 +82,9 @@ public final class InventoryIncrementalJobManager {
* @return job item infos
*/
public List<InventoryIncrementalJobItemInfo> getJobItemInfos(final String
jobId) {
- PipelineJobManager jobManager = new PipelineJobManager(jobAPI);
- PipelineJobConfiguration jobConfig =
jobManager.getJobConfiguration(jobId);
+ PipelineJobConfiguration jobConfig = new
PipelineJobManager(jobAPI).getJobConfiguration(jobId);
long startTimeMillis =
Long.parseLong(Optional.ofNullable(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId).getProps().getProperty("start_time_millis")).orElse("0"));
- InventoryIncrementalJobManager inventoryIncrementalJobManager = new
InventoryIncrementalJobManager(jobAPI);
- Map<Integer, InventoryIncrementalJobItemProgress> jobProgress =
inventoryIncrementalJobManager.getJobProgress(jobConfig);
+ Map<Integer, InventoryIncrementalJobItemProgress> jobProgress =
getJobProgress(jobConfig);
List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
PipelineJobItemManager<InventoryIncrementalJobItemProgress>
jobItemManager = new
PipelineJobItemManager<>(jobAPI.getYamlJobItemProgressSwapper());
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry :
jobProgress.entrySet()) {
@@ -155,25 +146,6 @@ public final class InventoryIncrementalJobManager {
return new
YamlJobOffsetInfoSwapper().swapToObject(offsetInfo.isPresent() ?
YamlEngine.unmarshal(offsetInfo.get(), YamlJobOffsetInfo.class) : new
YamlJobOffsetInfo());
}
- /**
- * List all data consistency check algorithms from SPI.
- *
- * @return data consistency check algorithms
- */
- public Collection<DataConsistencyCheckAlgorithmInfo>
listDataConsistencyCheckAlgorithms() {
- Collection<DataConsistencyCheckAlgorithmInfo> result = new
LinkedList<>();
- for (TableDataConsistencyChecker each :
ShardingSphereServiceLoader.getServiceInstances(TableDataConsistencyChecker.class))
{
- SPIDescription description =
each.getClass().getAnnotation(SPIDescription.class);
- String typeAliases =
each.getTypeAliases().stream().map(Object::toString).collect(Collectors.joining(","));
- result.add(new DataConsistencyCheckAlgorithmInfo(each.getType(),
typeAliases, getSupportedDatabaseTypes(each.getSupportedDatabaseTypes()), null
== description ? "" : description.value()));
- }
- return result;
- }
-
- private Collection<DatabaseType> getSupportedDatabaseTypes(final
Collection<DatabaseType> supportedDatabaseTypes) {
- return supportedDatabaseTypes.isEmpty() ?
ShardingSphereServiceLoader.getServiceInstances(DatabaseType.class) :
supportedDatabaseTypes;
- }
-
/**
* Aggregate data consistency check results.
*
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java
index 425d94e993b..23ac3a43c7e 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationCheckAlgorithmsExecutor.java
@@ -17,13 +17,10 @@
package org.apache.shardingsphere.migration.distsql.handler.query;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobAPI;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncrementalJobManager;
-import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
+import
org.apache.shardingsphere.data.pipeline.common.pojo.DataConsistencyCheckAlgorithmInfoRegistry;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import
org.apache.shardingsphere.migration.distsql.statement.ShowMigrationCheckAlgorithmsStatement;
import java.util.Arrays;
@@ -37,8 +34,7 @@ public final class ShowMigrationCheckAlgorithmsExecutor
implements QueryableRALE
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowMigrationCheckAlgorithmsStatement sqlStatement) {
- InventoryIncrementalJobManager inventoryIncrementalJobManager = new
InventoryIncrementalJobManager((InventoryIncrementalJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, "MIGRATION"));
- return
inventoryIncrementalJobManager.listDataConsistencyCheckAlgorithms().stream().map(
+ return
DataConsistencyCheckAlgorithmInfoRegistry.getAllAlgorithmInfos().stream().map(
each -> new LocalDataQueryResultRow(each.getType(),
each.getTypeAliases(),
each.getSupportedDatabaseTypes().stream().map(DatabaseType::getType).collect(Collectors.joining(",")),
each.getDescription()))
.collect(Collectors.toList());