This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 077058aac7a Extract JobProgress methods into inventory and incremental
task progress (#19940)
077058aac7a is described below
commit 077058aac7a7536bdc6f9e9f4815c81405942634
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Aug 8 09:49:23 2022 +0800
Extract JobProgress methods into inventory and incremental task progress
(#19940)
---
.../query/ShowScalingJobStatusQueryResultSet.java | 6 +--
.../progress/JobItemIncrementalTasksProgress.java | 9 ++++
.../pipeline/api/job/progress/JobProgress.java | 50 ----------------------
.../rulealtered/RuleAlteredJobPreparer.java | 5 ++-
.../rulealtered/prepare/InventoryTaskSplitter.java | 2 +-
.../core/job/progress/JobProgressTest.java | 32 +++++---------
.../progress/yaml/YamlJobProgressSwapperTest.java | 11 +----
.../test/resources/job-progress-no-finished.yaml | 31 --------------
8 files changed, 28 insertions(+), 118 deletions(-)
diff --git
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingJobStatusQueryResultSet.java
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingJobStatusQueryResultSet.java
index 37bfae61842..62ec3670803 100644
---
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingJobStatusQueryResultSet.java
+++
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-distsql/shardingsphere-sharding-distsql-handler/src/main/java/org/apache/shardingsphere/scaling/distsql/handler/query/ShowScalingJobStatusQueryResultSet.java
@@ -48,11 +48,11 @@ public final class ShowScalingJobStatusQueryResultSet
implements DistSQLResultSe
Collection<Object> result = new LinkedList<>();
result.add(entry.getKey());
if (null != entry.getValue()) {
- result.add(entry.getValue().getDataSource());
+
result.add(entry.getValue().getIncremental().getDataSourceName());
result.add(entry.getValue().getStatus());
result.add(entry.getValue().isActive() ?
Boolean.TRUE.toString() : Boolean.FALSE.toString());
-
result.add(entry.getValue().getInventoryFinishedPercentage());
- long latestActiveTimeMillis =
entry.getValue().getIncrementalLatestActiveTimeMillis();
+
result.add(entry.getValue().getInventory().getInventoryFinishedPercentage());
+ long latestActiveTimeMillis =
entry.getValue().getIncremental().getIncrementalLatestActiveTimeMillis();
result.add(latestActiveTimeMillis > 0 ?
TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - latestActiveTimeMillis) :
0);
} else {
result.add("");
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java
index ee4b4efeb2d..b4065d087f6 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobItemIncrementalTasksProgress.java
@@ -47,6 +47,15 @@ public final class JobItemIncrementalTasksProgress {
return
incrementalTaskProgress.map(IncrementalTaskProgress::getPosition);
}
+ /**
+ * Get data source name.
+ *
+ * @return data source
+ */
+ public String getDataSourceName() {
+ return
incrementalTaskProgressMap.keySet().stream().findAny().orElse("");
+ }
+
/**
* Get incremental latest active time milliseconds.
*
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
index c210dda4a4e..9a73161dfd9 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/JobProgress.java
@@ -19,12 +19,8 @@ package
org.apache.shardingsphere.data.pipeline.api.job.progress;
import lombok.Getter;
import lombok.Setter;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
-import java.util.Map;
-import java.util.Optional;
-
/**
* Job progress.
*/
@@ -42,50 +38,4 @@ public final class JobProgress implements
PipelineJobItemProgress {
private JobItemInventoryTasksProgress inventory;
private JobItemIncrementalTasksProgress incremental;
-
- /**
- * get incremental position.
- * @param dataSourceName dataSource
- * @return incremental position
- */
- public Optional<IngestPosition<?>> getIncrementalPosition(final String
dataSourceName) {
- return incremental.getIncrementalPosition(dataSourceName);
- }
-
- /**
- * Get inventory position.
- *
- * @param tableName table name
- * @return inventory position
- */
- public Map<String, IngestPosition<?>> getInventoryPosition(final String
tableName) {
- return inventory.getInventoryPosition(tableName);
- }
-
- /**
- * Get data source.
- *
- * @return data source
- */
- public String getDataSource() {
- return
incremental.getIncrementalTaskProgressMap().keySet().stream().findAny().orElse("");
- }
-
- /**
- * Get inventory finished percentage.
- *
- * @return finished percentage
- */
- public int getInventoryFinishedPercentage() {
- return inventory.getInventoryFinishedPercentage();
- }
-
- /**
- * Get incremental latest active time milliseconds.
- *
- * @return latest active time, <code>0</code> is there is no activity
- */
- public long getIncrementalLatestActiveTimeMillis() {
- return null == incremental ? 0L :
incremental.getIncrementalLatestActiveTimeMillis();
- }
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
index 979a33268af..f177417fd32 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/RuleAlteredJobPreparer.java
@@ -200,8 +200,9 @@ public final class RuleAlteredJobPreparer {
private IngestPosition<?> getIncrementalPosition(final
RuleAlteredJobContext jobContext, final TaskConfiguration taskConfig,
final
PipelineDataSourceManager dataSourceManager) throws SQLException {
- if (null != jobContext.getInitProgress()) {
- Optional<IngestPosition<?>> position =
jobContext.getInitProgress().getIncrementalPosition(taskConfig.getDumperConfig().getDataSourceName());
+ JobProgress initProgress = jobContext.getInitProgress();
+ if (null != initProgress) {
+ Optional<IngestPosition<?>> position =
initProgress.getIncremental().getIncrementalPosition(taskConfig.getDumperConfig().getDataSourceName());
if (position.isPresent()) {
return position.get();
}
diff --git
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
index 49d4ffbc738..2d80bee5541 100644
---
a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
+++
b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/prepare/InventoryTaskSplitter.java
@@ -139,7 +139,7 @@ public final class InventoryTaskSplitter {
PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(schemaName, actualTableName);
PipelineColumnMetaData uniqueKeyColumn =
mustGetAnAppropriateUniqueKeyColumn(tableMetaData, actualTableName);
if (null != initProgress && initProgress.getStatus() !=
JobStatus.PREPARING_FAILURE) {
- Collection<IngestPosition<?>> result =
initProgress.getInventoryPosition(dumperConfig.getActualTableName()).values();
+ Collection<IngestPosition<?>> result =
initProgress.getInventory().getInventoryPosition(dumperConfig.getActualTableName()).values();
for (IngestPosition<?> each : result) {
if (each instanceof PrimaryKeyPosition) {
dumperConfig.setUniqueKey(uniqueKeyColumn.getName());
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java
index 1fc6afb8f4a..f93aaba6e44 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java
@@ -19,12 +19,12 @@ package
org.apache.shardingsphere.data.pipeline.core.job.progress;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.FinishedPosition;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
-import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgressSwapper;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgress;
+import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.YamlJobProgressSwapper;
import org.apache.shardingsphere.data.pipeline.core.util.ConfigurationFileUtil;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import org.junit.Test;
@@ -52,7 +52,7 @@ public final class JobProgressTest {
@Test
public void assertGetIncrementalPosition() {
JobProgress jobProgress =
getJobProgress(ConfigurationFileUtil.readFile("job-progress.yaml"));
- Optional<IngestPosition<?>> position =
jobProgress.getIncrementalPosition("ds0");
+ Optional<IngestPosition<?>> position =
jobProgress.getIncremental().getIncrementalPosition("ds0");
assertTrue(position.isPresent());
assertThat(position.get(), instanceOf(PlaceholderPosition.class));
}
@@ -60,41 +60,31 @@ public final class JobProgressTest {
@Test
public void assertGetInventoryPosition() {
JobProgress jobProgress =
getJobProgress(ConfigurationFileUtil.readFile("job-progress.yaml"));
- assertThat(jobProgress.getInventoryPosition("ds0").size(), is(2));
- assertThat(jobProgress.getInventoryPosition("ds0").get("ds0.t_1"),
instanceOf(FinishedPosition.class));
- assertThat(jobProgress.getInventoryPosition("ds1").get("ds1.t_1"),
instanceOf(PlaceholderPosition.class));
- assertThat(jobProgress.getInventoryPosition("ds1").get("ds1.t_2"),
instanceOf(IntegerPrimaryKeyPosition.class));
+
assertThat(jobProgress.getInventory().getInventoryPosition("ds0").size(),
is(2));
+
assertThat(jobProgress.getInventory().getInventoryPosition("ds0").get("ds0.t_1"),
instanceOf(FinishedPosition.class));
+
assertThat(jobProgress.getInventory().getInventoryPosition("ds1").get("ds1.t_1"),
instanceOf(PlaceholderPosition.class));
+
assertThat(jobProgress.getInventory().getInventoryPosition("ds1").get("ds1.t_2"),
instanceOf(IntegerPrimaryKeyPosition.class));
}
@Test
public void assertGetInventoryFinishedPercentage() {
JobProgress jobProgress =
getJobProgress(ConfigurationFileUtil.readFile("job-progress.yaml"));
- assertThat(jobProgress.getInventoryFinishedPercentage(), is(50));
- }
-
- @Test
- public void assertGetNoFinishedInventoryFinishedPercentage() {
-
assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress-no-finished.yaml")).getInventoryFinishedPercentage(),
is(0));
+
assertThat(jobProgress.getInventory().getInventoryFinishedPercentage(), is(50));
}
@Test
public void assertGetAllFinishedInventoryFinishedPercentage() {
-
assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress-all-finished.yaml")).getInventoryFinishedPercentage(),
is(100));
+
assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress-all-finished.yaml")).getInventory().getInventoryFinishedPercentage(),
is(100));
}
@Test
public void assertGetIncrementalLatestActiveTimeMillis() {
-
assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress.yaml")).getIncrementalLatestActiveTimeMillis(),
is(0L));
+
assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress.yaml")).getIncremental().getIncrementalLatestActiveTimeMillis(),
is(0L));
}
@Test
public void assertGetIncrementalDataLatestActiveTimeMillis() {
-
assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress-all-finished.yaml")).getIncrementalLatestActiveTimeMillis(),
is(50L));
- }
-
- @Test
- public void assertGetNoIncrementalDataLatestActiveTimeMillis() {
-
assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress-no-finished.yaml")).getIncrementalLatestActiveTimeMillis(),
is(0L));
+
assertThat(getJobProgress(ConfigurationFileUtil.readFile("job-progress-all-finished.yaml")).getIncremental().getIncrementalLatestActiveTimeMillis(),
is(50L));
}
private JobProgress getJobProgress(final String data) {
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java
index e650552b2f6..44c46f2e730 100644
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java
+++
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlJobProgressSwapperTest.java
@@ -20,13 +20,11 @@ package
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml;
import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
import org.apache.shardingsphere.data.pipeline.core.util.ConfigurationFileUtil;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
-
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
public final class YamlJobProgressSwapperTest {
@@ -51,13 +49,6 @@ public final class YamlJobProgressSwapperTest {
assertThat(actual.getIncremental().getPosition().length(), is(0));
}
- @Test
- public void assertNullIncremental() {
- JobProgress jobProgress =
getJobProgress(ConfigurationFileUtil.readFile("job-progress-no-finished.yaml"));
- YamlJobProgress actual = SWAPPER.swapToYaml(jobProgress);
- assertNull(actual.getIncremental());
- }
-
@Test
public void assertNullInventory() {
JobProgress jobProgress =
getJobProgress(ConfigurationFileUtil.readFile("job-progress-no-inventory.yaml"));
diff --git
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml
deleted file mode 100644
index b0994b68b59..00000000000
---
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml
+++ /dev/null
@@ -1,31 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-#
-#incremental:
-# dataSourceName: ds0
-# delay:
-# lastEventTimestamps: 0
-# latestActiveTimeMillis: 0
-# position: ''
-inventory:
- unfinished:
- ds0.t_1: i,1,2
- ds0.t_2: ''
- ds1.t_2: i,1,2
- ds1.t_1: ''
-sourceDatabaseType: H2
-status: RUNNING