This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e28038a846 Add job sharding info to migration list (#35053)
9e28038a846 is described below

commit 9e28038a846dd7829d8e33c67178edbb8c1ebebb
Author: Raigor <[email protected]>
AuthorDate: Fri Mar 21 11:15:11 2025 +0800

    Add job sharding info to migration list (#35053)
    
    * Add job sharding info to migration list
    
    * Add job sharding nodes to the result set of `SHOW MIGRATION LIST`
---
 RELEASE-NOTES.md                                   |  1 +
 .../syntax/ral/migration/show-migration-list.cn.md | 29 +++++++++++-----------
 .../syntax/ral/migration/show-migration-list.en.md | 29 +++++++++++-----------
 .../pipeline/core/job/api/PipelineAPIFactory.java  | 15 +++++++++++
 .../core/job/service/PipelineJobManager.java       | 17 +++++++++++++
 .../migration/query/ShowMigrationListExecutor.java | 24 +++++++++++++++---
 6 files changed, 84 insertions(+), 31 deletions(-)

diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index beb32e75200..640c69001ab 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -28,6 +28,7 @@
 1. Encrypt: Use EncryptDerivedColumnSuffix to enhance encrypt table subquery 
rewrite logic - [#34829](https://github.com/apache/shardingsphere/pull/34829)
 1. Encrypt: Add quotes to encrypt rewrite derived columns - 
[#34950](https://github.com/apache/shardingsphere/pull/34950)
 1. SQL Router: Add check for select with union all routing to multi data 
sources - [#35037](https://github.com/apache/shardingsphere/pull/35037)
+1. DistSQL: Add job sharding nodes to the result set of `SHOW MIGRATION LIST` 
- [#35053](https://github.com/apache/shardingsphere/pull/35053)
 
 ### Bug Fixes
 
diff --git 
a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/migration/show-migration-list.cn.md
 
b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/migration/show-migration-list.cn.md
index 17ed7188a96..3d37c3752fb 100644
--- 
a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/migration/show-migration-list.cn.md
+++ 
b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/migration/show-migration-list.cn.md
@@ -24,14 +24,15 @@ ShowMigrationList ::=
 
 ### 返回值说明
 
-| 列              | 说明         |
-|----------------|------------|
-| id             | 数据迁移作业ID   |
-| tables         | 迁移表        |
-| job_item_count | 数据迁移作业分片数量 |
-| active         | 数据迁移作业状态   |
-| create_time    | 数据迁移作业创建时间 |
-| stop_time      | 数据迁移作业停止时间 |
+| 列              | 说明           |
+|----------------|--------------|
+| id             | 数据迁移作业ID     |
+| tables         | 迁移表          |
+| active         | 数据迁移作业状态     |
+| create_time    | 数据迁移作业创建时间   |
+| stop_time      | 数据迁移作业停止时间   |
+| job_item_count | 数据迁移作业分片数量   |
+| job_sharding_nodes | 数据迁移作业分片运行节点 |
 
 ### 示例
 
@@ -43,12 +44,12 @@ SHOW MIGRATION LIST;
 
 ```sql
 mysql> SHOW MIGRATION LIST;
-+---------------------------------------+---------+----------------+--------+---------------------+---------------------+
-| id                                    | tables  | job_item_count | active | 
create_time         | stop_time           |
-+---------------------------------------+---------+----------------+--------+---------------------+---------------------+
-| j01013a38b0184e07c864627b5bb05da09ee0 | t_order | 1              | false  | 
2022-10-31 18:18:24 | 2022-10-31 18:18:31 |
-+---------------------------------------+---------+----------------+--------+---------------------+---------------------+
-1 row in set (0.28 sec)
++--------------------------------------------+---------------------+--------+---------------------+-----------+----------------+--------------------+
+| id                                         | tables              | active | 
create_time         | stop_time | job_item_count | job_sharding_nodes |
++--------------------------------------------+---------------------+--------+---------------------+-----------+----------------+--------------------+
+| j0102p00001d029afca1fd960d567fed6cddc9b4a2 | source_ds.t_order   | true   | 
2022-10-31 18:18:24 |           | 1              | 10.7.5.76@-@27808  |
++--------------------------------------------+---------------------+--------+---------------------+-----------+----------------+--------------------+
+4 rows in set (0.06 sec)
 ```
 
 ### 保留字
diff --git 
a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/migration/show-migration-list.en.md
 
b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/migration/show-migration-list.en.md
index e30f6cffc8b..80ed0773792 100644
--- 
a/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/migration/show-migration-list.en.md
+++ 
b/docs/document/content/user-manual/shardingsphere-proxy/distsql/syntax/ral/migration/show-migration-list.en.md
@@ -23,14 +23,15 @@ ShowMigrationList ::=
 
 ### Return Values Description
 
-| Columns        | Description                    |
-|----------------|--------------------------------|
-| id             | migration job id               |
-| tables         | migration tables               |
-| job_item_count | migration job sharding number  |
-| active         | migration job states           |
-| create_time    | migration job create time      |
-| stop_time      | migration job stop time        |
+| Columns            | Description                   |
+|--------------------|-------------------------------|
+| id                 | migration job id              |
+| tables             | migration tables              |
+| job_item_count     | migration job sharding number |
+| active             | migration job states          |
+| create_time        | migration job create time     |
+| stop_time          | migration job stop time       |
+| job_sharding_nodes | migration job sharding nodes  |
 
 ### Example
 
@@ -42,12 +43,12 @@ SHOW MIGRATION LIST;
 
 ```sql
 mysql> SHOW MIGRATION LIST;
-+---------------------------------------+---------+----------------+--------+---------------------+---------------------+
-| id                                    | tables  | job_item_count | active | 
create_time         | stop_time           |
-+---------------------------------------+---------+----------------+--------+---------------------+---------------------+
-| j01013a38b0184e07c864627b5bb05da09ee0 | t_order | 1              | false  | 
2022-10-31 18:18:24 | 2022-10-31 18:18:31 |
-+---------------------------------------+---------+----------------+--------+---------------------+---------------------+
-1 row in set (0.28 sec)
++--------------------------------------------+---------------------+--------+---------------------+-----------+----------------+--------------------+
+| id                                         | tables              | active | 
create_time         | stop_time | job_item_count | job_sharding_nodes |
++--------------------------------------------+---------------------+--------+---------------------+-----------+----------------+--------------------+
+| j0102p00001d029afca1fd960d567fed6cddc9b4a2 | source_ds.t_order   | true   | 
2022-10-31 18:18:24 |           | 1              | 10.7.5.76@-@27808  |
++--------------------------------------------+---------------------+--------+---------------------+-----------+----------------+--------------------+
+4 rows in set (0.06 sec)
 ```
 
 ### Reserved word
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/api/PipelineAPIFactory.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/api/PipelineAPIFactory.java
index a941c8a3cc7..eba438ee6a4 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/api/PipelineAPIFactory.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/api/PipelineAPIFactory.java
@@ -31,9 +31,11 @@ import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.Pi
 import 
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
 import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobOperateAPI;
 import 
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobStatisticsAPI;
+import 
org.apache.shardingsphere.elasticjob.lite.lifecycle.api.ShardingStatisticsAPI;
 import 
org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.operate.JobOperateAPIImpl;
 import 
org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.settings.JobConfigurationAPIImpl;
 import 
org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.statistics.JobStatisticsAPIImpl;
+import 
org.apache.shardingsphere.elasticjob.lite.lifecycle.internal.statistics.ShardingStatisticsAPIImpl;
 import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
@@ -99,6 +101,16 @@ public final class PipelineAPIFactory {
         return ElasticJobAPIHolder.getInstance(contextKey).jobOperateAPI;
     }
     
+    /**
+     * Get sharding statistics API.
+     *
+     * @param contextKey context key
+     * @return sharding statistics API
+     */
+    public static ShardingStatisticsAPI getShardingStatisticsAPI(final 
PipelineContextKey contextKey) {
+        return 
ElasticJobAPIHolder.getInstance(contextKey).shardingStatisticsAPI;
+    }
+    
     /**
      * Get registry center.
      *
@@ -119,11 +131,14 @@ public final class PipelineAPIFactory {
         
         private final JobOperateAPI jobOperateAPI;
         
+        private final ShardingStatisticsAPI shardingStatisticsAPI;
+        
         private ElasticJobAPIHolder(final PipelineContextKey contextKey) {
             CoordinatorRegistryCenter registryCenter = 
getRegistryCenter(contextKey);
             jobStatisticsAPI = new JobStatisticsAPIImpl(registryCenter);
             jobConfigurationAPI = new JobConfigurationAPIImpl(registryCenter);
             jobOperateAPI = new JobOperateAPIImpl(registryCenter);
+            shardingStatisticsAPI = new 
ShardingStatisticsAPIImpl(registryCenter);
         }
         
         public static ElasticJobAPIHolder getInstance(final PipelineContextKey 
contextKey) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 581523c0821..c9d11ea3a7d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -32,11 +32,13 @@ import 
org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.PipelineGovernanceFacade;
 import 
org.apache.shardingsphere.data.pipeline.core.util.PipelineDistributedBarrier;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.ShardingInfo;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.datetime.DateTimeFormatterFactory;
 
 import java.time.LocalDateTime;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
@@ -173,4 +175,19 @@ public final class PipelineJobManager {
             return Collections.emptyList();
         }
     }
+    
+    /**
+     * Get pipeline job sharding info.
+     *
+     * @param contextKey context key
+     * @param jobId job id
+     * @return job sharding info
+     */
+    public Collection<ShardingInfo> getJobShardingInfos(final 
PipelineContextKey contextKey, final String jobId) {
+        try {
+            return 
PipelineAPIFactory.getShardingStatisticsAPI(contextKey).getShardingInfo(jobId);
+        } catch (final NullPointerException | UnsupportedOperationException 
ex) {
+            return Collections.emptyList();
+        }
+    }
 }
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/distsql/handler/migration/query/ShowMigrationListExecutor.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/distsql/handler/migration/query/ShowMigrationListExecutor.java
index b90398420c0..fc57e46c7a8 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/distsql/handler/migration/query/ShowMigrationListExecutor.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/data/pipeline/distsql/handler/migration/query/ShowMigrationListExecutor.java
@@ -19,9 +19,11 @@ package 
org.apache.shardingsphere.data.pipeline.distsql.handler.migration.query;
 
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
+import org.apache.shardingsphere.data.pipeline.core.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.migration.distsql.statement.queryable.ShowMigrationListStatement;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.distsql.handler.engine.query.DistSQLQueryExecutor;
+import org.apache.shardingsphere.elasticjob.lite.lifecycle.domain.ShardingInfo;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import 
org.apache.shardingsphere.infra.merge.result.impl.local.LocalDataQueryResultRow;
 import org.apache.shardingsphere.mode.manager.ContextManager;
@@ -39,13 +41,29 @@ public final class ShowMigrationListExecutor implements 
DistSQLQueryExecutor<Sho
     
     @Override
     public Collection<String> getColumnNames(final ShowMigrationListStatement 
sqlStatement) {
-        return Arrays.asList("id", "tables", "job_item_count", "active", 
"create_time", "stop_time");
+        return Arrays.asList("id", "tables", "active", "create_time", 
"stop_time", "job_item_count", "job_sharding_nodes");
     }
     
     @Override
     public Collection<LocalDataQueryResultRow> getRows(final 
ShowMigrationListStatement sqlStatement, final ContextManager contextManager) {
-        return pipelineJobManager.getJobInfos(new 
PipelineContextKey(InstanceType.PROXY)).stream().map(each -> new 
LocalDataQueryResultRow(each.getJobMetaData().getJobId(), each.getTableName(),
-                each.getJobMetaData().getJobItemCount(), 
each.getJobMetaData().isActive(), each.getJobMetaData().getCreateTime(), 
each.getJobMetaData().getStopTime())).collect(Collectors.toList());
+        PipelineContextKey contextKey = new 
PipelineContextKey(InstanceType.PROXY);
+        return pipelineJobManager.getJobInfos(contextKey).stream().map(each -> 
getRow(contextKey, each)).collect(Collectors.toList());
+    }
+    
+    private LocalDataQueryResultRow getRow(final PipelineContextKey 
contextKey, final PipelineJobInfo jobInfo) {
+        return new 
LocalDataQueryResultRow(jobInfo.getJobMetaData().getJobId(), 
jobInfo.getTableName(), jobInfo.getJobMetaData().isActive(), 
jobInfo.getJobMetaData().getCreateTime(),
+                jobInfo.getJobMetaData().getStopTime(), 
jobInfo.getJobMetaData().getJobItemCount(), getJobShardingNodes(contextKey, 
jobInfo.getJobMetaData().getJobId()));
+    }
+    
+    private String getJobShardingNodes(final PipelineContextKey contextKey, 
final String jobId) {
+        Collection<ShardingInfo> shardingInfos = 
pipelineJobManager.getJobShardingInfos(contextKey, jobId);
+        return shardingInfos.isEmpty() ? "" : 
getJobShardingNodes(shardingInfos);
+    }
+    
+    private String getJobShardingNodes(final Collection<ShardingInfo> 
shardingInfos) {
+        return 1 == shardingInfos.size()
+                ? shardingInfos.iterator().next().getInstanceId()
+                : shardingInfos.stream().map(each -> each.getItem() + "=" + 
each.getInstanceId()).collect(Collectors.joining(","));
     }
     
     @Override

Reply via email to