This is an automated email from the ASF dual-hosted git repository. zhonghongsheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 7b1af12 Enhance unit test of data-pipeline (#15639) 7b1af12 is described below commit 7b1af121526db89c2a6200fe42984c4aeaa67299 Author: ReyYang <yihui1270730...@163.com> AuthorDate: Mon Feb 28 15:30:46 2022 +0800 Enhance unit test of data-pipeline (#15639) --- .../api/impl/GovernanceRepositoryAPIImplTest.java | 12 +++++++- .../datasource/AbstractDataSourceCheckerTest.java | 34 ++++++++++++++++++-- .../core/job/progress/JobProgressTest.java | 36 ++++++++++++++++++++++ .../test/resources/job-progress-all-finished.yaml | 31 +++++++++++++++++++ .../test/resources/job-progress-no-finished.yaml | 31 +++++++++++++++++++ 5 files changed, 140 insertions(+), 4 deletions(-) diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java index 2ed943b..725ba27 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/api/impl/GovernanceRepositoryAPIImplTest.java @@ -22,6 +22,7 @@ import org.apache.shardingsphere.data.pipeline.api.config.ingest.InventoryDumper import org.apache.shardingsphere.data.pipeline.api.config.rulealtered.TaskConfiguration; import org.apache.shardingsphere.data.pipeline.api.datasource.PipelineDataSourceWrapper; import org.apache.shardingsphere.data.pipeline.api.ingest.position.PlaceholderPosition; +import org.apache.shardingsphere.data.pipeline.api.job.JobStatus; import org.apache.shardingsphere.data.pipeline.api.job.progress.JobProgress; import org.apache.shardingsphere.data.pipeline.core.api.GovernanceRepositoryAPI; import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory; @@ -89,7 +90,7 @@ public final class GovernanceRepositoryAPIImplTest { public void assertDeleteJob() { governanceRepositoryAPI.persist(DataPipelineConstants.DATA_PIPELINE_ROOT + "/1", ""); governanceRepositoryAPI.deleteJob("1"); - JobProgress actual = governanceRepositoryAPI.getJobProgress("0", 0); + JobProgress actual = governanceRepositoryAPI.getJobProgress("1", 0); assertNull(actual); } @@ -119,6 +120,15 @@ public final class GovernanceRepositoryAPIImplTest { assertThat(event.getType(), anyOf(is(Type.ADDED), is(Type.UPDATED))); } + @Test + public void assertRenewJobStatus() { + RuleAlteredJobContext jobContext = mockJobContext(); + governanceRepositoryAPI.persistJobProgress(jobContext); + governanceRepositoryAPI.renewJobStatus(JobStatus.FINISHED, jobContext.getJobId()); + JobProgress jobProgress = governanceRepositoryAPI.getJobProgress(jobContext.getJobId(), jobContext.getShardingItem()); + assertThat(jobProgress.getStatus(), is(JobStatus.FINISHED)); + } + private RuleAlteredJobContext mockJobContext() { RuleAlteredJobContext result = new RuleAlteredJobContext(ResourceUtil.mockJobConfig()); TaskConfiguration taskConfig = result.getTaskConfig(); diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceCheckerTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceCheckerTest.java index 4fe5fad..85412f4 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceCheckerTest.java +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/check/datasource/AbstractDataSourceCheckerTest.java @@ -17,6 +17,7 @@ package org.apache.shardingsphere.data.pipeline.core.check.datasource; +import lombok.SneakyThrows; import org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobPrepareFailedException; import org.junit.Before; import org.junit.Test; @@ -26,8 +27,11 @@ import org.mockito.junit.MockitoJUnitRunner; import javax.sql.DataSource; import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collection; +import java.util.Collections; import java.util.LinkedList; import static org.mockito.Mockito.verify; @@ -39,12 +43,18 @@ public final class AbstractDataSourceCheckerTest { @Mock(extraInterfaces = AutoCloseable.class) private DataSource dataSource; + private AbstractDataSourceChecker dataSourceChecker; + + private Collection<DataSource> dataSources; + @Mock private Connection connection; - private AbstractDataSourceChecker dataSourceChecker; + @Mock + private PreparedStatement preparedStatement; - private Collection<DataSource> dataSources; + @Mock + private ResultSet resultSet; @Before public void setUp() { @@ -67,8 +77,9 @@ public final class AbstractDataSourceCheckerTest { dataSources.add(dataSource); } + @SneakyThrows @Test - public void assertCheckConnection() throws SQLException { + public void assertCheckConnection() { when(dataSource.getConnection()).thenReturn(connection); dataSourceChecker.checkConnection(dataSources); verify(dataSource).getConnection(); @@ -79,4 +90,21 @@ public final class AbstractDataSourceCheckerTest { when(dataSource.getConnection()).thenThrow(new SQLException("error")); dataSourceChecker.checkConnection(dataSources); } + + @Test + public void assertCheckTargetTable() throws SQLException { + when(dataSource.getConnection()).thenReturn(connection); + when(connection.prepareStatement("SELECT * FROM `t_order` LIMIT 1")).thenReturn(preparedStatement); + when(preparedStatement.executeQuery()).thenReturn(resultSet); + dataSourceChecker.checkTargetTable(dataSources, Collections.singletonList("t_order")); + } + + @Test(expected = PipelineJobPrepareFailedException.class) + public void assertCheckTargetTableFailed() throws SQLException { + when(dataSource.getConnection()).thenReturn(connection); + when(connection.prepareStatement("SELECT * FROM `t_order` LIMIT 1")).thenReturn(preparedStatement); + when(preparedStatement.executeQuery()).thenReturn(resultSet); + when(resultSet.next()).thenReturn(true); + dataSourceChecker.checkTargetTable(dataSources, Collections.singletonList("t_order")); + } } diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java index 0608946..87b3a41 100644 --- a/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/java/org/apache/shardingsphere/data/pipeline/core/job/progress/JobProgressTest.java @@ -68,4 +68,40 @@ public final class JobProgressTest { assertTrue(jobProgress.getInventoryPosition("ds1").get("ds1.t_1") instanceof PlaceholderPosition); assertTrue(jobProgress.getInventoryPosition("ds1").get("ds1.t_2") instanceof PrimaryKeyPosition); } + + @Test + public void assertGetInventoryFinishedPercentage() { + JobProgress jobProgress = getJobProgress(ResourceUtil.readFileAndIgnoreComments("job-progress.yaml")); + assertThat(jobProgress.getInventoryFinishedPercentage(), is(50)); + } + + @Test + public void assertGetNoFinishedInventoryFinishedPercentage() { + JobProgress jobProgress = getJobProgress(ResourceUtil.readFileAndIgnoreComments("job-progress-no-finished.yaml")); + assertThat(jobProgress.getInventoryFinishedPercentage(), is(0)); + } + + @Test + public void assertGetAllFinishedInventoryFinishedPercentage() { + JobProgress jobProgress = getJobProgress(ResourceUtil.readFileAndIgnoreComments("job-progress-all-finished.yaml")); + assertThat(jobProgress.getInventoryFinishedPercentage(), is(100)); + } + + @Test + public void assertGetIncrementalLatestActiveTimeMillis() { + JobProgress jobProgress = getJobProgress(ResourceUtil.readFileAndIgnoreComments("job-progress.yaml")); + assertThat(jobProgress.getIncrementalLatestActiveTimeMillis(), is(0L)); + } + + @Test + public void assertGetIncrementalDataLatestActiveTimeMillis() { + JobProgress jobProgress = getJobProgress(ResourceUtil.readFileAndIgnoreComments("job-progress-all-finished.yaml")); + assertThat(jobProgress.getIncrementalLatestActiveTimeMillis(), is(50L)); + } + + @Test + public void assertGetNoIncrementalDataLatestActiveTimeMillis() { + JobProgress jobProgress = getJobProgress(ResourceUtil.readFileAndIgnoreComments("job-progress-no-finished.yaml")); + assertThat(jobProgress.getIncrementalLatestActiveTimeMillis(), is(0L)); + } } diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml new file mode 100644 index 0000000..04ba007 --- /dev/null +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-all-finished.yaml @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +incremental: + ds0: + delay: + lastEventTimestamps: 0 + latestActiveTimeMillis: 50 + position: '' +inventory: + finished: + - ds0.t_2 + - ds0.t_1 + - ds1.t_1 + - ds1.t_2 +sourceDatabaseType: H2 +status: RUNNING diff --git a/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml new file mode 100644 index 0000000..138537e --- /dev/null +++ b/shardingsphere-test/shardingsphere-pipeline-test/src/test/resources/job-progress-no-finished.yaml @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# incremental: +# ds0: +# delay: +# lastEventTimestamps: 0 +# latestActiveTimeMillis: 0 +# position: '' +inventory: + unfinished: + ds0.t_1: 1,2 + ds0.t_2: '' + ds1.t_2: 1,2 + ds1.t_1: '' +sourceDatabaseType: H2 +status: RUNNING