This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f8a420f2f55 Move PipelineJobAPI.marshalJobId() to
PipelineJobId.marshal() (#29011)
f8a420f2f55 is described below
commit f8a420f2f55a371801796e6331af2397ad9941cb
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Nov 12 08:53:27 2023 +0800
Move PipelineJobAPI.marshalJobId() to PipelineJobId.marshal() (#29011)
---
.../pipeline/core/job/service/PipelineJobAPI.java | 9 ++--
.../core/job/service/PipelineJobManager.java | 52 ++++++++++++++++++++++
.../AbstractInventoryIncrementalJobAPIImpl.java | 5 +--
.../service/impl/AbstractPipelineJobAPIImpl.java | 18 --------
.../handler/query/ShowStreamingListExecutor.java | 5 ++-
.../handler/query/ShowMigrationListExecutor.java | 5 ++-
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 2 +-
.../api/impl/ConsistencyCheckJobAPI.java | 2 +-
.../migration/api/impl/MigrationJobAPI.java | 2 +-
9 files changed, 66 insertions(+), 34 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index ee0b59f54e2..59c4a42fb7a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -30,7 +30,6 @@ import
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConf
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
-import java.util.List;
import java.util.Optional;
/**
@@ -97,11 +96,11 @@ public interface PipelineJobAPI extends TypedSPI {
/**
* Get pipeline job info.
- *
- * @param contextKey context key
- * @return job info list
+ *
+ * @param jobId job ID
+ * @return pipeline job info
*/
- List<PipelineJobInfo> list(PipelineContextKey contextKey);
+ PipelineJobInfo getJobInfo(String jobId);
/**
* Persist job item progress.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
new file mode 100644
index 00000000000..55a4a6b3337
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job.service;
+
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Pipeline job manager.
+ */
+@RequiredArgsConstructor
+public final class PipelineJobManager {
+
+ private final PipelineJobAPI pipelineJobAPI;
+
+ /**
+ * Get pipeline jobs info.
+ *
+ * @param contextKey context key
+ * @return jobs info
+ */
+ public List<PipelineJobInfo> getPipelineJobInfos(final PipelineContextKey
contextKey) {
+ return getJobBriefInfos(contextKey, pipelineJobAPI.getType()).map(each
-> pipelineJobAPI.getJobInfo(each.getJobName())).collect(Collectors.toList());
+ }
+
+ private Stream<JobBriefInfo> getJobBriefInfos(final PipelineContextKey
contextKey, final String jobType) {
+ return
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each
-> !each.getJobName().startsWith("_"))
+ .filter(each ->
jobType.equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()));
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 8d79fbf093f..da4ead55272 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -88,9 +88,6 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl
extends AbstractPip
return
PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey,
getType()));
}
- @Override
- protected abstract TableBasedPipelineJobInfo getJobInfo(String jobId);
-
@Override
public Map<Integer, InventoryIncrementalJobItemProgress>
getJobProgress(final PipelineJobConfiguration jobConfig) {
String jobId = jobConfig.getJobId();
@@ -111,7 +108,7 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
for (Entry<Integer, InventoryIncrementalJobItemProgress> entry :
jobProgress.entrySet()) {
int shardingItem = entry.getKey();
- TableBasedPipelineJobInfo jobInfo = getJobInfo(jobId);
+ TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo)
getJobInfo(jobId);
InventoryIncrementalJobItemProgress jobItemProgress =
entry.getValue();
String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
if (null == jobItemProgress) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
index 35e9b000d96..5d52a1cfb66 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
@@ -24,7 +24,6 @@ import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipeli
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
@@ -35,18 +34,14 @@ import
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
-import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
/**
* Abstract pipeline job API impl.
@@ -56,19 +51,6 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
protected static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- @Override
- public List<PipelineJobInfo> list(final PipelineContextKey contextKey) {
- return getJobBriefInfos(contextKey).map(each ->
getJobInfo(each.getJobName())).collect(Collectors.toList());
- }
-
- private Stream<JobBriefInfo> getJobBriefInfos(final PipelineContextKey
contextKey) {
- return
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each
-> !each.getJobName().startsWith("_"))
- .filter(each ->
PipelineJobIdUtils.parseJobType(each.getJobName()).getType().equals(getType()));
- }
-
- // TODO Add getJobInfo
- protected abstract PipelineJobInfo getJobInfo(String jobId);
-
protected PipelineJobMetaData buildPipelineJobMetaData(final
JobConfigurationPOJO jobConfigPOJO) {
return new PipelineJobMetaData(jobConfigPOJO.getJobName(),
!jobConfigPOJO.isDisabled(),
jobConfigPOJO.getShardingTotalCount(),
jobConfigPOJO.getProps().getProperty("create_time"),
jobConfigPOJO.getProps().getProperty("stop_time"),
jobConfigPOJO.getJobParameter());
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
index fe9676f5119..f6ca1ea172a 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
@@ -21,6 +21,7 @@ import
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatemen
import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
@@ -35,11 +36,11 @@ import java.util.stream.Collectors;
*/
public final class ShowStreamingListExecutor implements
QueryableRALExecutor<ShowStreamingListStatement> {
- private final CDCJobAPI jobAPI = new CDCJobAPI();
+ private final PipelineJobManager pipelineJobManager = new
PipelineJobManager(new CDCJobAPI());
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowStreamingListStatement sqlStatement) {
- return jobAPI.list(new
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
+ return pipelineJobManager.getPipelineJobInfos(new
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
((TableBasedPipelineJobInfo) each).getDatabaseName(),
((TableBasedPipelineJobInfo) each).getTable(),
each.getJobMetaData().getJobItemCount(),
each.getJobMetaData().isActive() ? Boolean.TRUE.toString() :
Boolean.FALSE.toString(),
each.getJobMetaData().getCreateTime(),
Optional.ofNullable(each.getJobMetaData().getStopTime()).orElse(""))).collect(Collectors.toList());
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
index 94585e89753..a731a088a97 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.migration.distsql.handler.query;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
+import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
@@ -34,11 +35,11 @@ import java.util.stream.Collectors;
*/
public final class ShowMigrationListExecutor implements
QueryableRALExecutor<ShowMigrationListStatement> {
- private final MigrationJobAPI jobAPI = new MigrationJobAPI();
+ private final PipelineJobManager pipelineJobManager = new
PipelineJobManager(new MigrationJobAPI());
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowMigrationListStatement sqlStatement) {
- return jobAPI.list(new
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
+ return pipelineJobManager.getPipelineJobInfos(new
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
((TableBasedPipelineJobInfo) each).getTable(),
each.getJobMetaData().getJobItemCount(),
each.getJobMetaData().isActive() ? Boolean.TRUE.toString() :
Boolean.FALSE.toString(),
each.getJobMetaData().getCreateTime(),
each.getJobMetaData().getStopTime())).collect(Collectors.toList());
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 81942ca429e..88dabfe855e 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -300,7 +300,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
}
@Override
- protected TableBasedPipelineJobInfo getJobInfo(final String jobId) {
+ public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
PipelineJobMetaData jobMetaData =
buildPipelineJobMetaData(jobConfigPOJO);
CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 2f8f80c5bbd..7a894858c80 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -385,7 +385,7 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
}
@Override
- protected PipelineJobInfo getJobInfo(final String jobId) {
+ public PipelineJobInfo getJobInfo(final String jobId) {
throw new UnsupportedOperationException();
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 6605a1218ae..7477ebcf612 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -209,7 +209,7 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
}
@Override
- protected TableBasedPipelineJobInfo getJobInfo(final String jobId) {
+ public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
PipelineJobMetaData jobMetaData =
buildPipelineJobMetaData(jobConfigPOJO);
List<String> sourceTables = new LinkedList<>();