This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4ee38f5eacc Add generic type of
PipelineJobManager.getJobConfiguration() (#29082)
4ee38f5eacc is described below
commit 4ee38f5eacc2fc43db08ff9cc117fe1cce550ae2
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Nov 19 12:43:03 2023 +0800
Add generic type of PipelineJobManager.getJobConfiguration() (#29082)
---
.../data/pipeline/core/job/service/PipelineJobManager.java | 6 ++++--
.../migration/distsql/handler/update/CheckMigrationJobUpdater.java | 2 +-
.../apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java | 4 ++--
.../shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java | 4 ++--
.../scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java | 3 +--
.../data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java | 6 +++---
.../consistencycheck/api/impl/ConsistencyCheckJobAPITest.java | 3 +--
.../pipeline/scenario/migration/api/impl/MigrationJobAPITest.java | 6 +++---
8 files changed, 17 insertions(+), 17 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index eccbdd80227..0e8d2a2d682 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -58,10 +58,12 @@ public final class PipelineJobManager {
* Get job configuration.
*
* @param jobConfigPOJO job configuration POJO
+ * @param <T> type of pipeline job configuration
* @return pipeline job configuration
*/
- public PipelineJobConfiguration getJobConfiguration(final
JobConfigurationPOJO jobConfigPOJO) {
- return
jobAPI.getYamlJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
+ @SuppressWarnings("unchecked")
+ public <T extends PipelineJobConfiguration> T getJobConfiguration(final
JobConfigurationPOJO jobConfigPOJO) {
+ return (T)
jobAPI.getYamlJobConfigurationSwapper().swapToObject(jobConfigPOJO.getJobParameter());
}
/**
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
index 86d91508712..f59ed72f6a6 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
@@ -49,7 +49,7 @@ public final class CheckMigrationJobUpdater implements
RALUpdater<CheckMigration
String algorithmTypeName = null == typeStrategy ? null :
typeStrategy.getName();
Properties algorithmProps = null == typeStrategy ? null :
typeStrategy.getProps();
String jobId = sqlStatement.getJobId();
- MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) new
PipelineJobManager(migrationJobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+ MigrationJobConfiguration jobConfig = new
PipelineJobManager(migrationJobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
verifyInventoryFinished(jobConfig);
checkJobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps,
jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType()));
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 135d62f7710..6f711ab1992 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -291,7 +291,7 @@ public final class CDCJobAPI implements
InventoryIncrementalJobAPI {
public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(jobConfigPOJO);
- CDCJobConfiguration jobConfig = (CDCJobConfiguration) new
PipelineJobManager(this).getJobConfiguration(jobConfigPOJO);
+ CDCJobConfiguration jobConfig = new
PipelineJobManager(this).getJobConfiguration(jobConfigPOJO);
return new TableBasedPipelineJobInfo(jobMetaData,
jobConfig.getDatabaseName(), String.join(", ",
jobConfig.getSchemaTableNames()));
}
@@ -306,7 +306,7 @@ public final class CDCJobAPI implements
InventoryIncrementalJobAPI {
*/
public void dropStreaming(final String jobId) {
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
- CDCJobConfiguration jobConfig = (CDCJobConfiguration) new
PipelineJobManager(this).getJobConfiguration(jobConfigPOJO);
+ CDCJobConfiguration jobConfig = new
PipelineJobManager(this).getJobConfiguration(jobConfigPOJO);
ShardingSpherePreconditions.checkState(jobConfigPOJO.isDisabled(), ()
-> new PipelineInternalException("Can't drop streaming job which is active"));
new PipelineJobManager(this).drop(jobId);
cleanup(jobConfig);
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
index bc041d9db81..1f1a9473cd4 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/handler/CDCBackendHandler.java
@@ -82,7 +82,7 @@ public final class CDCBackendHandler {
* @return database
*/
public String getDatabaseNameByJobId(final String jobId) {
- return ((CDCJobConfiguration)
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId))).getDatabaseName();
+ return
jobManager.<CDCJobConfiguration>getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId)).getDatabaseName();
}
/**
@@ -130,7 +130,7 @@ public final class CDCBackendHandler {
* @param connectionContext connection context
*/
public void startStreaming(final String jobId, final CDCConnectionContext
connectionContext, final Channel channel) {
- CDCJobConfiguration cdcJobConfig = (CDCJobConfiguration)
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+ CDCJobConfiguration cdcJobConfig =
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
ShardingSpherePreconditions.checkNotNull(cdcJobConfig, () -> new
PipelineJobNotFoundException(jobId));
if (PipelineJobCenter.isJobExisting(jobId)) {
PipelineJobCenter.stop(jobId);
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 93520db26f8..51f8e5edc11 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -268,8 +268,7 @@ public final class ConsistencyCheckJobAPI implements
PipelineJobAPI {
}
private void fillInJobItemInfoWithCheckAlgorithm(final
ConsistencyCheckJobItemInfo result, final String checkJobId) {
- ConsistencyCheckJobConfiguration jobConfig =
(ConsistencyCheckJobConfiguration) new PipelineJobManager(this)
-
.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
+ ConsistencyCheckJobConfiguration jobConfig = new
PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
result.setAlgorithmType(jobConfig.getAlgorithmTypeName());
if (null != jobConfig.getAlgorithmProps()) {
result.setAlgorithmProps(jobConfig.getAlgorithmProps().entrySet().stream().map(entry
-> String.format("'%s'='%s'", entry.getKey(),
entry.getValue())).collect(Collectors.joining(",")));
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 4f7a023d21b..c5d551a55ce 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -214,7 +214,7 @@ public final class MigrationJobAPI implements
InventoryIncrementalJobAPI {
JobConfigurationPOJO jobConfigPOJO =
PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId);
PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(jobConfigPOJO);
List<String> sourceTables = new LinkedList<>();
- ((MigrationJobConfiguration) new
PipelineJobManager(this).getJobConfiguration(jobConfigPOJO)).getJobShardingDataNodes().forEach(each
-> each.getEntries().forEach(entry -> entry.getDataNodes()
+ new
PipelineJobManager(this).<MigrationJobConfiguration>getJobConfiguration(jobConfigPOJO).getJobShardingDataNodes().forEach(each
-> each.getEntries().forEach(entry -> entry.getDataNodes()
.forEach(dataNode ->
sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
return new TableBasedPipelineJobInfo(jobMetaData, String.join(",",
sourceTables));
}
@@ -324,7 +324,7 @@ public final class MigrationJobAPI implements
InventoryIncrementalJobAPI {
}
private void cleanTempTableOnRollback(final String jobId) throws
SQLException {
- MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) new
PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+ MigrationJobConfiguration jobConfig = new
PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
PipelineCommonSQLBuilder pipelineSQLBuilder = new
PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType());
TableAndSchemaNameMapper mapping = new
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
try (
@@ -348,7 +348,7 @@ public final class MigrationJobAPI implements
InventoryIncrementalJobAPI {
PipelineJobManager jobManager = new PipelineJobManager(this);
jobManager.stop(jobId);
dropCheckJobs(jobId);
- MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) new
PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+ MigrationJobConfiguration jobConfig = new
PipelineJobManager(this).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
jobManager.drop(jobId);
log.info("Commit cost {} ms", System.currentTimeMillis() -
startTimeMillis);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index 04c35a541bb..7762178b079 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -69,8 +69,7 @@ class ConsistencyCheckJobAPITest {
String parentJobId = parentJobConfig.getJobId();
String checkJobId = jobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(parentJobId, null, null,
parentJobConfig.getSourceDatabaseType(),
parentJobConfig.getTargetDatabaseType()));
- ConsistencyCheckJobConfiguration checkJobConfig =
(ConsistencyCheckJobConfiguration) new PipelineJobManager(jobAPI)
-
.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
+ ConsistencyCheckJobConfiguration checkJobConfig = new
PipelineJobManager(jobAPI).getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(checkJobId));
int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
String expectCheckJobId = new
ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId),
parentJobId, expectedSequence).marshal();
assertThat(checkJobConfig.getJobId(), is(expectCheckJobId));
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index e70b05acc01..3e67089680a 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -150,7 +150,7 @@ class MigrationJobAPITest {
void assertRollback() throws SQLException {
Optional<String> jobId =
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- MigrationJobConfiguration jobConfig = (MigrationJobConfiguration)
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
+ MigrationJobConfiguration jobConfig =
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
initTableData(jobConfig);
PipelineDistributedBarrier mockBarrier =
mock(PipelineDistributedBarrier.class);
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
@@ -162,7 +162,7 @@ class MigrationJobAPITest {
void assertCommit() {
Optional<String> jobId =
jobManager.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- MigrationJobConfiguration jobConfig = (MigrationJobConfiguration)
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
+ MigrationJobConfiguration jobConfig =
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId.get()));
initTableData(jobConfig);
PipelineDistributedBarrier mockBarrier =
mock(PipelineDistributedBarrier.class);
when(PipelineDistributedBarrier.getInstance(any())).thenReturn(mockBarrier);
@@ -285,7 +285,7 @@ class MigrationJobAPITest {
initIntPrimaryEnvironment();
SourceTargetEntry sourceTargetEntry = new
SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order");
String jobId =
jobAPI.createJobAndStart(PipelineContextUtils.getContextKey(), new
MigrateTableStatement(Collections.singletonList(sourceTargetEntry),
"logic_db"));
- MigrationJobConfiguration actual = (MigrationJobConfiguration)
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
+ MigrationJobConfiguration actual =
jobManager.getJobConfiguration(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
assertThat(actual.getTargetDatabaseName(), is("logic_db"));
List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes();
assertThat(dataNodeLines.size(), is(1));