This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1cfdd1bc485 Add PipelineJobMetaData (#21457)
1cfdd1bc485 is described below
commit 1cfdd1bc485404a981720e3bf823538d572616a9
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Oct 10 18:04:36 2022 +0800
Add PipelineJobMetaData (#21457)
---
.../data/pipeline/api/pojo/PipelineJobInfo.java | 28 +++++++---------------
...pelineJobInfo.java => PipelineJobMetaData.java} | 4 ++--
.../api/pojo/TableBasedPipelineJobInfo.java | 6 +++--
.../core/api/impl/AbstractPipelineJobAPIImpl.java | 13 +++++-----
.../scenario/migration/MigrationJobAPIImpl.java | 2 +-
.../core/api/impl/MigrationJobAPIImplTest.java | 12 +++++-----
6 files changed, 28 insertions(+), 37 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/PipelineJobInfo.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/PipelineJobInfo.java
index 8aed52442b2..99fb2051edc 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/PipelineJobInfo.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/PipelineJobInfo.java
@@ -17,27 +17,15 @@
package org.apache.shardingsphere.data.pipeline.api.pojo;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.Setter;
-
/**
- * Pipeline Job Info.
+ * Pipeline job meta data.
*/
-@RequiredArgsConstructor
-@Getter
-@Setter
-public abstract class PipelineJobInfo {
-
- private final String jobId;
-
- private boolean active;
-
- private int shardingTotalCount;
-
- private String createTime;
-
- private String stopTime;
+public interface PipelineJobInfo {
- private transient String jobParameter;
+ /**
+ * Get job meta data.
+ *
+ * @return job meta data
+ */
+ PipelineJobMetaData getJobMetaData();
}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/PipelineJobInfo.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/PipelineJobMetaData.java
similarity index 94%
copy from
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/PipelineJobInfo.java
copy to
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/PipelineJobMetaData.java
index 8aed52442b2..672b305294c 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/PipelineJobInfo.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/PipelineJobMetaData.java
@@ -22,12 +22,12 @@ import lombok.RequiredArgsConstructor;
import lombok.Setter;
/**
- * Pipeline Job Info.
+ * Pipeline job meta data.
*/
@RequiredArgsConstructor
@Getter
@Setter
-public abstract class PipelineJobInfo {
+public final class PipelineJobMetaData {
private final String jobId;
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
index 283e1d45030..2fc2aaa7104 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/pojo/TableBasedPipelineJobInfo.java
@@ -27,11 +27,13 @@ import lombok.ToString;
@Getter
@Setter
@ToString(callSuper = true)
-public final class TableBasedPipelineJobInfo extends PipelineJobInfo {
+public final class TableBasedPipelineJobInfo implements PipelineJobInfo {
+
+ private final PipelineJobMetaData jobMetaData;
private String table;
public TableBasedPipelineJobInfo(final String jobId) {
- super(jobId);
+ jobMetaData = new PipelineJobMetaData(jobId);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
index c7de841ad14..6f21a3e8e73 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/AbstractPipelineJobAPIImpl.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.ObjectUtils;
import
org.apache.shardingsphere.data.pipeline.api.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.config.job.yaml.YamlPipelineJobConfiguration;
import org.apache.shardingsphere.data.pipeline.api.job.PipelineJobId;
+import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobMetaData;
import org.apache.shardingsphere.data.pipeline.api.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -85,12 +86,12 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
protected abstract PipelineJobInfo getJobInfo(String jobId);
- protected void fillJobInfo(final PipelineJobInfo jobInfo, final
JobConfigurationPOJO jobConfigPOJO) {
- jobInfo.setActive(!jobConfigPOJO.isDisabled());
- jobInfo.setShardingTotalCount(jobConfigPOJO.getShardingTotalCount());
-
jobInfo.setCreateTime(jobConfigPOJO.getProps().getProperty("create_time"));
- jobInfo.setStopTime(jobConfigPOJO.getProps().getProperty("stop_time"));
- jobInfo.setJobParameter(jobConfigPOJO.getJobParameter());
+ protected void fillJobMetaData(final PipelineJobMetaData jobMetaData,
final JobConfigurationPOJO jobConfigPOJO) {
+ jobMetaData.setActive(!jobConfigPOJO.isDisabled());
+
jobMetaData.setShardingTotalCount(jobConfigPOJO.getShardingTotalCount());
+
jobMetaData.setCreateTime(jobConfigPOJO.getProps().getProperty("create_time"));
+
jobMetaData.setStopTime(jobConfigPOJO.getProps().getProperty("stop_time"));
+ jobMetaData.setJobParameter(jobConfigPOJO.getJobParameter());
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
index b2c8091930f..63fc1833583 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobAPIImpl.java
@@ -126,7 +126,7 @@ public final class MigrationJobAPIImpl extends
AbstractInventoryIncrementalJobAP
protected TableBasedPipelineJobInfo getJobInfo(final String jobId) {
TableBasedPipelineJobInfo result = new
TableBasedPipelineJobInfo(jobId);
JobConfigurationPOJO jobConfigPOJO = getElasticJobConfigPOJO(jobId);
- fillJobInfo(result, jobConfigPOJO);
+ fillJobMetaData(result.getJobMetaData(), jobConfigPOJO);
result.setTable(getJobConfiguration(jobConfigPOJO).getSourceTableName());
return result;
}
diff --git
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
index d5ae171c579..f8dd4794b7b 100644
---
a/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
+++
b/test/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/api/impl/MigrationJobAPIImplTest.java
@@ -94,13 +94,13 @@ public final class MigrationJobAPIImplTest {
Optional<String> jobId =
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
PipelineJobInfo jobInfo = getNonNullJobInfo(jobId.get());
- assertTrue(jobInfo.isActive());
+ assertTrue(jobInfo.getJobMetaData().isActive());
assertThat(((TableBasedPipelineJobInfo) jobInfo).getTable(),
is("t_order"));
- assertThat(jobInfo.getShardingTotalCount(), is(1));
+ assertThat(jobInfo.getJobMetaData().getShardingTotalCount(), is(1));
}
private Optional<? extends PipelineJobInfo> getJobInfo(final String jobId)
{
- return jobAPI.list().stream().filter(each ->
Objects.equals(each.getJobId(), jobId)).reduce((a, b) -> a);
+ return jobAPI.list().stream().filter(each ->
Objects.equals(each.getJobMetaData().getJobId(), jobId)).reduce((a, b) -> a);
}
private PipelineJobInfo getNonNullJobInfo(final String jobId) {
@@ -113,11 +113,11 @@ public final class MigrationJobAPIImplTest {
public void assertStartOrStopById() {
Optional<String> jobId =
jobAPI.start(JobConfigurationBuilder.createJobConfiguration());
assertTrue(jobId.isPresent());
- assertTrue(getNonNullJobInfo(jobId.get()).isActive());
+ assertTrue(getNonNullJobInfo(jobId.get()).getJobMetaData().isActive());
jobAPI.stop(jobId.get());
- assertFalse(getNonNullJobInfo(jobId.get()).isActive());
+
assertFalse(getNonNullJobInfo(jobId.get()).getJobMetaData().isActive());
jobAPI.startDisabledJob(jobId.get());
- assertTrue(getNonNullJobInfo(jobId.get()).isActive());
+ assertTrue(getNonNullJobInfo(jobId.get()).getJobMetaData().isActive());
}
@Test