This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ee38f5eacc Add generic type of 
PipelineJobManager.getJobConfiguration() (#29082)
4ee38f5eacc is described below

commit 4ee38f5eacc2fc43db08ff9cc117fe1cce550ae2
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Nov 19 12:43:03 2023 +0800

    Add generic type of PipelineJobManager.getJobConfiguration() (#29082)
---
 .../data/pipeline/core/job/service/PipelineJobManager.java          | 6 ++++--
 .../migration/distsql/handler/update/CheckMigrationJobUpdater.java  | 2 +-
 .../apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java | 4 ++--
 .../shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java | 4 ++--
 .../scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java  | 3 +--
 .../data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java  | 6 +++---
 .../consistencycheck/api/impl/ConsistencyCheckJobAPITest.java       | 3 +--
 .../pipeline/scenario/migration/api/impl/MigrationJobAPITest.java   | 6 +++---
 8 files changed, 17 insertions(+), 17 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index eccbdd80227..0e8d2a2d682 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -58,10 +58,12 @@ public final class PipelineJobManager {
      * Get job configuration.
      *
      * @param jobConfigPOJO job configuration POJO
+     * @param <T> type of pipeline job configuration
      * @return pipeline job configuration
      */
-    public PipelineJobConfiguration getJobConfiguration(final 
JobConfigurationPOJO jobConfigPOJO) {
-        return 
jobAPI.getYamlJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
+    @SuppressWarnings("unchecked")
+    public <T extends PipelineJobConfiguration> T getJobConfiguration(final 
JobConfigurationPOJO jobConfigPOJO) {
+        return (T) 
jobAPI.getYamlJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
     }
     
     /**
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
index 86d91508712..f59ed72f6a6 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
@@ -49,7 +49,7 @@ public final class CheckMigrationJobUpdater implements 
RALUpdater<CheckMigration
         String algorithmTypeName = null == typeStrategy ? null : 
typeStrategy.getName();
         Properties algorithmProps = null == typeStrategy ? null : 
typeStrategy.getProps();
         String jobId = sqlStatement.getJobId();
-        MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) new 
PipelineJobManager(migrationJobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+        MigrationJobConfiguration jobConfig = new 
PipelineJobManager(migrationJobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
         verifyInventoryFinished(jobConfig);
         checkJobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps, 
jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType()));
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 135d62f7710..6f711ab1992 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -291,7 +291,7 @@ public final class CDCJobAPI implements 
InventoryIncrementalJobAPI {
     public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(jobConfigPOJO);
-        CDCJobConfiguration jobConfig = (CDCJobConfiguration) new 
PipelineJobManager(this).getJobConfiguration(jobConfigPOJO);
+        CDCJobConfiguration jobConfig = new 
PipelineJobManager(this).getJobConfiguration(jobConfigPOJO);
         return new TableBasedPipelineJobInfo(jobMetaData, 
jobConfig.getDatabaseName(), String.join(", ", 
jobConfig.getSchemaTableNames()));
     }
     
@@ -306,7 +306,7 @@ public final class CDCJobAPI implements 
InventoryIncrementalJobAPI {
      */
     public void dropStreaming(final String jobId) {
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
-        CDCJobConfiguration jobConfig = (CDCJobConfiguration) new 
PipelineJobManager(this).getJobConfiguration(jobConfigPOJO);
+        CDCJobConfiguration jobConfig = new 
PipelineJobManager(this).getJobConfiguration(jobConfigPOJO);
         ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), () 
-> new PipelineInternalException("Can't drop streaming job which is active"));
         new PipelineJobManager(this).drop(jobId);
         cleanup(jobConfig);
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index bc041d9db81..1f1a9473cd4 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -82,7 +82,7 @@ public final class CDCBackendHandler {
      * @return database
      */
     public String getDatabaseNameByJobId(final String jobId) {
-        return ((CDCJobConfiguration) 
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId))).getDatabaseName();
+        return 
jobManager.<CDCJobConfiguration>getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)).getDatabaseName();
     }
     
     /**
@@ -130,7 +130,7 @@ public final class CDCBackendHandler {
      * @param connectionContext connection context
      */
     public void startStreaming(final String jobId, final CDCConnectionContext 
connectionContext, final Channel channel) {
-        CDCJobConfiguration cdcJobConfig = (CDCJobConfiguration) 
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+        CDCJobConfiguration cdcJobConfig = 
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
         ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new 
PipelineJobNotFoundException(jobId));
         if (PipelineJobCenter.isJobExisting(jobId)) {
             PipelineJobCenter.stop(jobId);
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 93520db26f8..51f8e5edc11 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -268,8 +268,7 @@ public final class ConsistencyCheckJobAPI implements 
PipelineJobAPI {
     }
     
     private void fillInJobItemInfoWithCheckAlgorithm(final 
ConsistencyCheckJobItemInfo result, final String checkJobId) {
-        ConsistencyCheckJobConfiguration jobConfig = 
(ConsistencyCheckJobConfiguration) new PipelineJobManager(this)
-                
.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
+        ConsistencyCheckJobConfiguration jobConfig = new 
PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
         result.setAlgorithmType(jobConfig.getAlgorithmTypeName());
         if (null != jobConfig.getAlgorithmProps()) {
             
result.setAlgorithmProps(jobConfig.getAlgorithmProps().entrySet().stream().map(entry
 -> String.format("'%s'='%s'", entry.getKey(), 
entry.getValue())).collect(Collectors.joining(",")));
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 4f7a023d21b..c5d551a55ce 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -214,7 +214,7 @@ public final class MigrationJobAPI implements 
InventoryIncrementalJobAPI {
         JobConfigurationPOJO jobConfigPOJO = 
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
         PipelineJobMetaData jobMetaData = new 
PipelineJobMetaData(jobConfigPOJO);
         List<String> sourceTables = new LinkedList<>();
-        ((MigrationJobConfiguration) new 
PipelineJobManager(this).getJobConfiguration(jobConfigPOJO)).getJobShardingDataNodes().forEach(each
 -> each.getEntries().forEach(entry -> entry.getDataNodes()
+        new 
PipelineJobManager(this).<MigrationJobConfiguration>getJobConfiguration(jobConfigPOJO).getJobShardingDataNodes().forEach(each
 -> each.getEntries().forEach(entry -> entry.getDataNodes()
                 .forEach(dataNode -> 
sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
         return new TableBasedPipelineJobInfo(jobMetaData, String.join(",", 
sourceTables));
     }
@@ -324,7 +324,7 @@ public final class MigrationJobAPI implements 
InventoryIncrementalJobAPI {
     }
     
     private void cleanTempTableOnRollback(final String jobId) throws 
SQLException {
-        MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) new 
PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+        MigrationJobConfiguration jobConfig = new 
PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
         PipelineCommonSQLBuilder pipelineSQLBuilder = new 
PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType());
         TableAndSchemaNameMapper mapping = new 
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
         try (
@@ -348,7 +348,7 @@ public final class MigrationJobAPI implements 
InventoryIncrementalJobAPI {
         PipelineJobManager jobManager = new PipelineJobManager(this);
         jobManager.stop(jobId);
         dropCheckJobs(jobId);
-        MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) new 
PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+        MigrationJobConfiguration jobConfig = new 
PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
         refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
         jobManager.drop(jobId);
         log.info("Commit cost {} ms", System.currentTimeMillis() - 
startTimeMillis);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index 04c35a541bb..7762178b079 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -69,8 +69,7 @@ class ConsistencyCheckJobAPITest {
         String parentJobId = parentJobConfig.getJobId();
         String checkJobId = jobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(parentJobId, null, null,
                 parentJobConfig.getSourceDatabaseType(), 
parentJobConfig.getTargetDatabaseType()));
-        ConsistencyCheckJobConfiguration checkJobConfig = 
(ConsistencyCheckJobConfiguration) new PipelineJobManager(jobAPI)
-                
.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
+        ConsistencyCheckJobConfiguration checkJobConfig = new 
PipelineJobManager(jobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
         int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
         String expectCheckJobId = new 
ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), 
parentJobId, expectedSequence).marshal();
         assertThat(checkJobConfig.getJobId(), is(expectCheckJobId));
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index e70b05acc01..3e67089680a 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -150,7 +150,7 @@ class MigrationJobAPITest {
     void assertRollback() throws SQLException {
         Optional<String> jobId = 
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
-        MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) 
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
+        MigrationJobConfiguration jobConfig = 
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
         initTableData(jobConfig);
         PipelineDistributedBarrier mockBarrier = 
mock(PipelineDistributedBarrier.class);
         
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
@@ -162,7 +162,7 @@ class MigrationJobAPITest {
     void assertCommit() {
         Optional<String> jobId = 
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
         assertTrue(jobId.isPresent());
-        MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) 
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
+        MigrationJobConfiguration jobConfig = 
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
         initTableData(jobConfig);
         PipelineDistributedBarrier mockBarrier = 
mock(PipelineDistributedBarrier.class);
         
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
@@ -285,7 +285,7 @@ class MigrationJobAPITest {
         initIntPrimaryEnvironment();
         SourceTargetEntry sourceTargetEntry = new 
SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order");
         String jobId = 
jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new 
MigrateTableStatement(Collections.singletonList(sourceTargetEntry), 
"logic_db"));
-        MigrationJobConfiguration actual = (MigrationJobConfiguration) 
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+        MigrationJobConfiguration actual = 
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
         assertThat(actual.getTargetDatabaseName(), is("logic_db"));
         List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes();
         assertThat(dataNodeLines.size(), is(1));

Reply via email to