This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b1af12  Enhance unit test of data-pipeline (#15639)
7b1af12 is described below

commit 7b1af121526db89c2a6200fe42984c4aeaa67299
Author: ReyYang <yihui1270730...@163.com>
AuthorDate: Mon Feb 28 15:30:46 2022 +0800

    Enhance unit test of data-pipeline (#15639)
---
 .../api/impl/GovernanceRepositoryAPIImplTest.java  | 12 +++++++-
 .../datasource/AbstractDataSourceCheckerTest.java  | 34 ++++++++++++++++++--
 .../core/job/progress/JobProgressTest.java         | 36 ++++++++++++++++++++++
 .../test/resources/job-progress-all-finished.yaml  | 31 +++++++++++++++++++
 .../test/resources/job-progress-no-finished.yaml   | 31 +++++++++++++++++++
 5 files changed, 140 insertions(+), 4 deletions(-)

diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
index 2ed943b..725ba27 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumper
 import 
org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition;
+import org.apache.shardingsphere.data.pipeline.api.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
@@ -89,7 +90,7 @@ public final class GovernanceRepositoryAPIImplTest {
     public void assertDeleteJob() {
         
governanceRepositoryAPI.persist(DataPipelineConstants.DATA_PIPELINE_ROOT + 
"/1", "");
         governanceRepositoryAPI.deleteJob("1");
-        JobProgress actual = governanceRepositoryAPI.getJobProgress("0", 0);
+        JobProgress actual = governanceRepositoryAPI.getJobProgress("1", 0);
         assertNull(actual);
     }
     
@@ -119,6 +120,15 @@ public final class GovernanceRepositoryAPIImplTest {
         assertThat(event.getType(), anyOf(is(Type.ADDED), is(Type.UPDATED)));
     }
     
+    @Test
+    public void assertRenewJobStatus() {
+        RuleAlteredJobContext jobContext = mockJobContext();
+        governanceRepositoryAPI.persistJobProgress(jobContext);
+        governanceRepositoryAPI.renewJobStatus(JobStatus.FINISHED, 
jobContext.getJobId());
+        JobProgress jobProgress = 
governanceRepositoryAPI.getJobProgress(jobContext.getJobId(), 
jobContext.getShardingItem());
+        assertThat(jobProgress.getStatus(), is(JobStatus.FINISHED));
+    }
+    
     private RuleAlteredJobContext mockJobContext() {
         RuleAlteredJobContext result = new 
RuleAlteredJobContext(ResourceUtil.mockJobConfig());
         TaskConfiguration taskConfig = result.getTaskConfig();
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceCheckerTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceCheckerTest.java
index 4fe5fad..85412f4 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceCheckerTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceCheckerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.check.datasource;
 
+import lombok.SneakyThrows;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException;
 import org.junit.Before;
 import org.junit.Test;
@@ -26,8 +27,11 @@ import org.mockito.junit.MockitoJUnitRunner;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedList;
 
 import static org.mockito.Mockito.verify;
@@ -39,12 +43,18 @@ public final class AbstractDataSourceCheckerTest {
     @Mock(extraInterfaces = AutoCloseable.class)
     private DataSource dataSource;
     
+    private AbstractDataSourceChecker dataSourceChecker;
+    
+    private Collection<DataSource> dataSources;
+    
     @Mock
     private Connection connection;
     
-    private AbstractDataSourceChecker dataSourceChecker;
+    @Mock
+    private PreparedStatement preparedStatement;
     
-    private Collection<DataSource> dataSources;
+    @Mock
+    private ResultSet resultSet;
     
     @Before
     public void setUp() {
@@ -67,8 +77,9 @@ public final class AbstractDataSourceCheckerTest {
         dataSources.add(dataSource);
     }
     
+    @SneakyThrows
     @Test
-    public void assertCheckConnection() throws SQLException {
+    public void assertCheckConnection() {
         when(dataSource.getConnection()).thenReturn(connection);
         dataSourceChecker.checkConnection(dataSources);
         verify(dataSource).getConnection();
@@ -79,4 +90,21 @@ public final class AbstractDataSourceCheckerTest {
         when(dataSource.getConnection()).thenThrow(new SQLException("error"));
         dataSourceChecker.checkConnection(dataSources);
     }
+    
+    @Test
+    public void assertCheckTargetTable() throws SQLException {
+        when(dataSource.getConnection()).thenReturn(connection);
+        when(connection.prepareStatement("SELECT * FROM `t_order` LIMIT 
1")).thenReturn(preparedStatement);
+        when(preparedStatement.executeQuery()).thenReturn(resultSet);
+        dataSourceChecker.checkTargetTable(dataSources, 
Collections.singletonList("t_order"));
+    }
+    
+    @Test(expected = PipelineJobPrepareFailedException.class)
+    public void assertCheckTargetTableFailed() throws SQLException {
+        when(dataSource.getConnection()).thenReturn(connection);
+        when(connection.prepareStatement("SELECT * FROM `t_order` LIMIT 
1")).thenReturn(preparedStatement);
+        when(preparedStatement.executeQuery()).thenReturn(resultSet);
+        when(resultSet.next()).thenReturn(true);
+        dataSourceChecker.checkTargetTable(dataSources, 
Collections.singletonList("t_order"));
+    }
 }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java
index 0608946..87b3a41 100644
--- 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java
@@ -68,4 +68,40 @@ public final class JobProgressTest {
         assertTrue(jobProgress.getInventoryPosition("ds1").get("ds1.t_1") 
instanceof PlaceholderPosition);
         assertTrue(jobProgress.getInventoryPosition("ds1").get("ds1.t_2") 
instanceof PrimaryKeyPosition);
     }
+    
+    @Test
+    public void assertGetInventoryFinishedPercentage() {
+        JobProgress jobProgress = 
getJobProgress(ResourceUtil.readFileAndIgnoreComments("job-progress.yaml"));
+        assertThat(jobProgress.getInventoryFinishedPercentage(), is(50));
+    }
+    
+    @Test
+    public void assertGetNoFinishedInventoryFinishedPercentage() {
+        JobProgress jobProgress = 
getJobProgress(ResourceUtil.readFileAndIgnoreComments("job-progress-no-finished.yaml"));
+        assertThat(jobProgress.getInventoryFinishedPercentage(), is(0));
+    }
+    
+    @Test
+    public void assertGetAllFinishedInventoryFinishedPercentage() {
+        JobProgress jobProgress = 
getJobProgress(ResourceUtil.readFileAndIgnoreComments("job-progress-all-finished.yaml"));
+        assertThat(jobProgress.getInventoryFinishedPercentage(), is(100));
+    }
+    
+    @Test
+    public void assertGetIncrementalLatestActiveTimeMillis() {
+        JobProgress jobProgress = 
getJobProgress(ResourceUtil.readFileAndIgnoreComments("job-progress.yaml"));
+        assertThat(jobProgress.getIncrementalLatestActiveTimeMillis(), is(0L));
+    }
+    
+    @Test
+    public void assertGetIncrementalDataLatestActiveTimeMillis() {
+        JobProgress jobProgress = 
getJobProgress(ResourceUtil.readFileAndIgnoreComments("job-progress-all-finished.yaml"));
+        assertThat(jobProgress.getIncrementalLatestActiveTimeMillis(), 
is(50L));
+    }
+    
+    @Test
+    public void assertGetNoIncrementalDataLatestActiveTimeMillis() {
+        JobProgress jobProgress = 
getJobProgress(ResourceUtil.readFileAndIgnoreComments("job-progress-no-finished.yaml"));
+        assertThat(jobProgress.getIncrementalLatestActiveTimeMillis(), is(0L));
+    }
 }
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml
new file mode 100644
index 0000000..04ba007
--- /dev/null
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+incremental:
+  ds0:
+    delay:
+      lastEventTimestamps: 0
+      latestActiveTimeMillis: 50
+    position: ''
+inventory:
+  finished:
+  - ds0.t_2
+  - ds0.t_1
+  - ds1.t_1
+  - ds1.t_2
+sourceDatabaseType: H2
+status: RUNNING
diff --git 
a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml
 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml
new file mode 100644
index 0000000..138537e
--- /dev/null
+++ 
b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+# incremental:
+#   ds0:
+#     delay:
+#       lastEventTimestamps: 0
+#       latestActiveTimeMillis: 0
+#     position: ''
+inventory:
+  unfinished:
+    ds0.t_1: 1,2
+    ds0.t_2: ''
+    ds1.t_2: 1,2
+    ds1.t_1: ''
+sourceDatabaseType: H2
+status: RUNNING

Reply via email to