This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new f8a420f2f55 Move PipelineJobAPI.marshalJobId() to 
PipelineJobId.marshal() (#29011)
f8a420f2f55 is described below

commit f8a420f2f55a371801796e6331af2397ad9941cb
Author: Liang Zhang <[email protected]>
AuthorDate: Sun Nov 12 08:53:27 2023 +0800

    Move PipelineJobAPI.marshalJobId() to PipelineJobId.marshal() (#29011)
---
 .../pipeline/core/job/service/PipelineJobAPI.java  |  9 ++--
 .../core/job/service/PipelineJobManager.java       | 52 ++++++++++++++++++++++
 .../AbstractInventoryIncrementalJobAPIImpl.java    |  5 +--
 .../service/impl/AbstractPipelineJobAPIImpl.java   | 18 --------
 .../handler/query/ShowStreamingListExecutor.java   |  5 ++-
 .../handler/query/ShowMigrationListExecutor.java   |  5 ++-
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |  2 +-
 .../api/impl/ConsistencyCheckJobAPI.java           |  2 +-
 .../migration/api/impl/MigrationJobAPI.java        |  2 +-
 9 files changed, 66 insertions(+), 34 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index ee0b59f54e2..59c4a42fb7a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -30,7 +30,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConf
 import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
 
-import java.util.List;
 import java.util.Optional;
 
 /**
@@ -97,11 +96,11 @@ public interface PipelineJobAPI extends TypedSPI {
     
     /**
      * Get pipeline job info.
-     *
-     * @param contextKey context key
-     * @return job info list
+     * 
+     * @param jobId job ID
+     * @return pipeline job info
      */
-    List<PipelineJobInfo> list(PipelineContextKey contextKey);
+    PipelineJobInfo getJobInfo(String jobId);
     
     /**
      * Persist job item progress.
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
new file mode 100644
index 00000000000..55a4a6b3337
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.core.job.service;
+
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
+import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Pipeline job manager.
+ */
+@RequiredArgsConstructor
+public final class PipelineJobManager {
+    
+    private final PipelineJobAPI pipelineJobAPI;
+    
+    /**
+     * Get pipeline jobs info.
+     *
+     * @param contextKey context key
+     * @return jobs info
+     */
+    public List<PipelineJobInfo> getPipelineJobInfos(final PipelineContextKey 
contextKey) {
+        return getJobBriefInfos(contextKey, pipelineJobAPI.getType()).map(each 
-> pipelineJobAPI.getJobInfo(each.getJobName())).collect(Collectors.toList());
+    }
+    
+    private Stream<JobBriefInfo> getJobBriefInfos(final PipelineContextKey 
contextKey, final String jobType) {
+        return 
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each
 -> !each.getJobName().startsWith("_"))
+                .filter(each -> 
jobType.equals(PipelineJobIdUtils.parseJobType(each.getJobName()).getType()));
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index 8d79fbf093f..da4ead55272 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -88,9 +88,6 @@ public abstract class AbstractInventoryIncrementalJobAPIImpl 
extends AbstractPip
         return 
PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey,
 getType()));
     }
     
-    @Override
-    protected abstract TableBasedPipelineJobInfo getJobInfo(String jobId);
-    
     @Override
     public Map<Integer, InventoryIncrementalJobItemProgress> 
getJobProgress(final PipelineJobConfiguration jobConfig) {
         String jobId = jobConfig.getJobId();
@@ -111,7 +108,7 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
         List<InventoryIncrementalJobItemInfo> result = new LinkedList<>();
         for (Entry<Integer, InventoryIncrementalJobItemProgress> entry : 
jobProgress.entrySet()) {
             int shardingItem = entry.getKey();
-            TableBasedPipelineJobInfo jobInfo = getJobInfo(jobId);
+            TableBasedPipelineJobInfo jobInfo = (TableBasedPipelineJobInfo) 
getJobInfo(jobId);
             InventoryIncrementalJobItemProgress jobItemProgress = 
entry.getValue();
             String errorMessage = getJobItemErrorMessage(jobId, shardingItem);
             if (null == jobItemProgress) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
index 35e9b000d96..5d52a1cfb66 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
@@ -24,7 +24,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipeli
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
 import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
 import 
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
@@ -35,18 +34,14 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
-import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.JobBriefInfo;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.Collections;
-import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
  * Abstract pipeline job API impl.
@@ -56,19 +51,6 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
     
     protected static final DateTimeFormatter DATE_TIME_FORMATTER = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
     
-    @Override
-    public List<PipelineJobInfo> list(final PipelineContextKey contextKey) {
-        return getJobBriefInfos(contextKey).map(each -> 
getJobInfo(each.getJobName())).collect(Collectors.toList());
-    }
-    
-    private Stream<JobBriefInfo> getJobBriefInfos(final PipelineContextKey 
contextKey) {
-        return 
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each
 -> !each.getJobName().startsWith("_"))
-                .filter(each -> 
PipelineJobIdUtils.parseJobType(each.getJobName()).getType().equals(getType()));
-    }
-    
-    // TODO Add getJobInfo
-    protected abstract PipelineJobInfo getJobInfo(String jobId);
-    
     protected PipelineJobMetaData buildPipelineJobMetaData(final 
JobConfigurationPOJO jobConfigPOJO) {
         return new PipelineJobMetaData(jobConfigPOJO.getJobName(), 
!jobConfigPOJO.isDisabled(),
                 jobConfigPOJO.getShardingTotalCount(), 
jobConfigPOJO.getProps().getProperty("create_time"), 
jobConfigPOJO.getProps().getProperty("stop_time"), 
jobConfigPOJO.getJobParameter());
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
index fe9676f5119..f6ca1ea172a 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingListExecutor.java
@@ -21,6 +21,7 @@ import 
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingListStatemen
 import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import 
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
@@ -35,11 +36,11 @@ import java.util.stream.Collectors;
  */
 public final class ShowStreamingListExecutor implements 
QueryableRALExecutor<ShowStreamingListStatement> {
     
-    private final CDCJobAPI jobAPI = new CDCJobAPI();
+    private final PipelineJobManager pipelineJobManager = new 
PipelineJobManager(new CDCJobAPI());
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowStreamingListStatement sqlStatement) {
-        return jobAPI.list(new 
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new 
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
+        return pipelineJobManager.getPipelineJobInfos(new 
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new 
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
                 ((TableBasedPipelineJobInfo) each).getDatabaseName(), 
((TableBasedPipelineJobInfo) each).getTable(),
                 each.getJobMetaData().getJobItemCount(), 
each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : 
Boolean.FALSE.toString(),
                 each.getJobMetaData().getCreateTime(), 
Optional.ofNullable(each.getJobMetaData().getStopTime()).orElse(""))).collect(Collectors.toList());
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
index 94585e89753..a731a088a97 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationListExecutor.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.migration.distsql.handler.query;
 
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.common.pojo.TableBasedPipelineJobInfo;
+import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
 import 
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
@@ -34,11 +35,11 @@ import java.util.stream.Collectors;
  */
 public final class ShowMigrationListExecutor implements 
QueryableRALExecutor<ShowMigrationListStatement> {
     
-    private final MigrationJobAPI jobAPI = new MigrationJobAPI();
+    private final PipelineJobManager pipelineJobManager = new 
PipelineJobManager(new MigrationJobAPI());
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowMigrationListStatement sqlStatement) {
-        return jobAPI.list(new 
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new 
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
+        return pipelineJobManager.getPipelineJobInfos(new 
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new 
LocalDataQueryResultRow(each.getJobMetaData().getJobId(),
                 ((TableBasedPipelineJobInfo) each).getTable(), 
each.getJobMetaData().getJobItemCount(),
                 each.getJobMetaData().isActive() ? Boolean.TRUE.toString() : 
Boolean.FALSE.toString(),
                 each.getJobMetaData().getCreateTime(), 
each.getJobMetaData().getStopTime())).collect(Collectors.toList());
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 81942ca429e..88dabfe855e 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -300,7 +300,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
     }
     
     @Override
-    protected TableBasedPipelineJobInfo getJobInfo(final String jobId) {
+    public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         PipelineJobMetaData jobMetaData = 
buildPipelineJobMetaData(jobConfigPOJO);
         CDCJobConfiguration jobConfig = getJobConfiguration(jobConfigPOJO);
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 2f8f80c5bbd..7a894858c80 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -385,7 +385,7 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
     }
     
     @Override
-    protected PipelineJobInfo getJobInfo(final String jobId) {
+    public PipelineJobInfo getJobInfo(final String jobId) {
         throw new UnsupportedOperationException();
     }
     
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 6605a1218ae..7477ebcf612 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -209,7 +209,7 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
     }
     
     @Override
-    protected TableBasedPipelineJobInfo getJobInfo(final String jobId) {
+    public TableBasedPipelineJobInfo getJobInfo(final String jobId) {
         JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
         PipelineJobMetaData jobMetaData = 
buildPipelineJobMetaData(jobConfigPOJO);
         List<String> sourceTables = new LinkedList<>();

Reply via email to