This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0730fa3bd61 Refactor MySQLIncrementalDumperTest and
WALEventConverterTest (#28936)
0730fa3bd61 is described below
commit 0730fa3bd6198424917a2d96676dc893b0e99a52
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 4 14:50:38 2023 +0800
Refactor MySQLIncrementalDumperTest and WALEventConverterTest (#28936)
---
.../mysql/ingest/MySQLIncrementalDumperTest.java | 108 +++++++++++----------
.../ingest/wal/WALEventConverterTest.java | 30 +++---
2 files changed, 74 insertions(+), 64 deletions(-)
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index f4aea914c98..22ef95c6ab0 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -17,14 +17,12 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest;
-import lombok.SneakyThrows;
import
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
import
org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
-import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
@@ -55,7 +53,6 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
-import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -80,8 +77,8 @@ class MySQLIncrementalDumperTest {
private PipelineTableMetaData pipelineTableMetaData;
@BeforeEach
- void setUp() {
- IncrementalDumperContext dumperContext = mockDumperContext();
+ void setUp() throws SQLException {
+ IncrementalDumperContext dumperContext = createDumperContext();
initTableData(dumperContext);
dumperContext.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test",
"root", "root"));
PipelineTableMetaDataLoader metaDataLoader =
mock(PipelineTableMetaDataLoader.class);
@@ -91,7 +88,7 @@ class MySQLIncrementalDumperTest {
when(metaDataLoader.getTableMetaData(any(),
any())).thenReturn(pipelineTableMetaData);
}
- private IncrementalDumperContext mockDumperContext() {
+ private IncrementalDumperContext createDumperContext() {
IncrementalDumperContext result = new IncrementalDumperContext();
result.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL",
"root", "root"));
result.setTableNameMap(Collections.singletonMap(new
ActualTableName("t_order"), new LogicTableName("t_order")));
@@ -99,8 +96,7 @@ class MySQLIncrementalDumperTest {
return result;
}
- @SneakyThrows(SQLException.class)
- private void initTableData(final IncrementalDumperContext dumperContext) {
+ private void initTableData(final IncrementalDumperContext dumperContext)
throws SQLException {
try (
PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperContext.getDataSourceConfig());
@@ -125,62 +121,70 @@ class MySQLIncrementalDumperTest {
}
@Test
- void assertWriteRowsEventWithoutCustomColumns() throws
ReflectiveOperationException {
- assertWriteRowsEvent0(3);
- }
-
- private void assertWriteRowsEvent0(final int expectedColumnCount) throws
ReflectiveOperationException {
- WriteRowsEvent rowsEvent = new WriteRowsEvent();
- rowsEvent.setDatabaseName("");
- rowsEvent.setTableName("t_order");
- rowsEvent.setAfterRows(Collections.singletonList(new
Serializable[]{101, 1, "OK"}));
- Method method =
MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent",
WriteRowsEvent.class, PipelineTableMetaData.class);
- List<Record> actual = (List<Record>)
Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent,
pipelineTableMetaData);
+ void assertWriteRowsEvent() throws ReflectiveOperationException {
+ List<Record> actual =
getRecordsByWriteRowsEvent(createWriteRowsEvent());
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(),
is(IngestDataChangeType.INSERT));
- assertThat(((DataRecord) actual.get(0)).getColumnCount(),
is(expectedColumnCount));
+ assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
}
- private Map<LogicTableName, Collection<ColumnName>>
mockTargetTableColumnsMap() {
- return Collections.singletonMap(new LogicTableName("t_order"),
Collections.singleton(new ColumnName("order_id")));
+ private WriteRowsEvent createWriteRowsEvent() {
+ WriteRowsEvent result = new WriteRowsEvent();
+ result.setDatabaseName("");
+ result.setTableName("t_order");
+ result.setAfterRows(Collections.singletonList(new Serializable[]{101,
1, "OK"}));
+ return result;
}
- @Test
- void assertUpdateRowsEventWithoutCustomColumns() throws
ReflectiveOperationException {
- assertUpdateRowsEvent0(3);
+ private List<Record> getRecordsByWriteRowsEvent(final WriteRowsEvent
rowsEvent) throws ReflectiveOperationException {
+ Method method =
MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent",
WriteRowsEvent.class, PipelineTableMetaData.class);
+ return (List<Record>) Plugins.getMemberAccessor().invoke(method,
incrementalDumper, rowsEvent, pipelineTableMetaData);
}
- private void assertUpdateRowsEvent0(final int expectedColumnCount) throws
ReflectiveOperationException {
- UpdateRowsEvent rowsEvent = new UpdateRowsEvent();
- rowsEvent.setDatabaseName("test");
- rowsEvent.setTableName("t_order");
- rowsEvent.setBeforeRows(Collections.singletonList(new
Serializable[]{101, 1, "OK"}));
- rowsEvent.setAfterRows(Collections.singletonList(new
Serializable[]{101, 1, "OK2"}));
- Method method =
MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent",
UpdateRowsEvent.class, PipelineTableMetaData.class);
- List<Record> actual = (List<Record>)
Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent,
pipelineTableMetaData);
+ @Test
+ void assertUpdateRowsEvent() throws ReflectiveOperationException {
+ List<Record> actual =
getRecordsByUpdateRowsEvent(createUpdateRowsEvent());
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(),
is(IngestDataChangeType.UPDATE));
- assertThat(((DataRecord) actual.get(0)).getColumnCount(),
is(expectedColumnCount));
+ assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
}
- @Test
- void assertDeleteRowsEventWithoutCustomColumns() throws
ReflectiveOperationException {
- assertDeleteRowsEvent0(3);
+ private UpdateRowsEvent createUpdateRowsEvent() {
+ UpdateRowsEvent result = new UpdateRowsEvent();
+ result.setDatabaseName("test");
+ result.setTableName("t_order");
+ result.setBeforeRows(Collections.singletonList(new Serializable[]{101,
1, "OK"}));
+ result.setAfterRows(Collections.singletonList(new Serializable[]{101,
1, "OK2"}));
+ return result;
}
- private void assertDeleteRowsEvent0(final int expectedColumnCount) throws
ReflectiveOperationException {
- DeleteRowsEvent rowsEvent = new DeleteRowsEvent();
- rowsEvent.setDatabaseName("");
- rowsEvent.setTableName("t_order");
- rowsEvent.setBeforeRows(Collections.singletonList(new
Serializable[]{101, 1, "OK"}));
- Method method =
MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent",
DeleteRowsEvent.class, PipelineTableMetaData.class);
- List<Record> actual = (List<Record>)
Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent,
pipelineTableMetaData);
+ private List<Record> getRecordsByUpdateRowsEvent(final UpdateRowsEvent
rowsEvent) throws ReflectiveOperationException {
+ Method method =
MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent",
UpdateRowsEvent.class, PipelineTableMetaData.class);
+ return (List<Record>) Plugins.getMemberAccessor().invoke(method,
incrementalDumper, rowsEvent, pipelineTableMetaData);
+ }
+
+ @Test
+ void assertDeleteRowsEvent() throws ReflectiveOperationException {
+ List<Record> actual =
getRecordsByDeleteRowsEvent(createDeleteRowsEvent());
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(),
is(IngestDataChangeType.DELETE));
- assertThat(((DataRecord) actual.get(0)).getColumnCount(),
is(expectedColumnCount));
+ assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
+ }
+
+ private DeleteRowsEvent createDeleteRowsEvent() {
+ DeleteRowsEvent result = new DeleteRowsEvent();
+ result.setDatabaseName("");
+ result.setTableName("t_order");
+ result.setBeforeRows(Collections.singletonList(new Serializable[]{101,
1, "OK"}));
+ return result;
+ }
+
+ private List<Record> getRecordsByDeleteRowsEvent(final DeleteRowsEvent
rowsEvent) throws ReflectiveOperationException {
+ Method method =
MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent",
DeleteRowsEvent.class, PipelineTableMetaData.class);
+ return (List<Record>) Plugins.getMemberAccessor().invoke(method,
incrementalDumper, rowsEvent, pipelineTableMetaData);
}
@Test
@@ -192,13 +196,17 @@ class MySQLIncrementalDumperTest {
@Test
void assertRowsEventFiltered() throws ReflectiveOperationException {
- WriteRowsEvent rowsEvent = new WriteRowsEvent();
- rowsEvent.setDatabaseName("test");
- rowsEvent.setTableName("t_order");
- rowsEvent.setAfterRows(Collections.singletonList(new
Serializable[]{1}));
List<Record> actual = (List<Record>)
Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent",
AbstractBinlogEvent.class),
- incrementalDumper, rowsEvent);
+ incrementalDumper, getFilteredWriteRowsEvent());
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
}
+
+ private WriteRowsEvent getFilteredWriteRowsEvent() {
+ WriteRowsEvent result = new WriteRowsEvent();
+ result.setDatabaseName("test");
+ result.setTableName("t_order");
+ result.setAfterRows(Collections.singletonList(new Serializable[]{1}));
+ return result;
+ }
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 1197dfefd82..16ae05b1bc3 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
-import lombok.SneakyThrows;
import
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
import
org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
@@ -76,7 +75,7 @@ class WALEventConverterTest {
private PipelineTableMetaData pipelineTableMetaData;
@BeforeEach
- void setUp() {
+ void setUp() throws SQLException {
IncrementalDumperContext dumperContext = mockDumperContext();
PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
walEventConverter = new WALEventConverter(dumperContext, new
StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getDataSourceConfig())));
@@ -92,8 +91,7 @@ class WALEventConverterTest {
return result;
}
- @SneakyThrows(SQLException.class)
- private void initTableData(final IncrementalDumperContext dumperContext) {
+ private void initTableData(final IncrementalDumperContext dumperContext)
throws SQLException {
try (
PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager();
PipelineDataSourceWrapper dataSource =
dataSourceManager.getDataSource(dumperContext.getDataSourceConfig());
@@ -118,19 +116,23 @@ class WALEventConverterTest {
}
@Test
- void assertWriteRowEventWithoutCustomColumns() throws
ReflectiveOperationException {
- assertWriteRowEvent0(3);
+ void assertWriteRowEvent() throws ReflectiveOperationException {
+ DataRecord actual = getDataRecord(createWriteRowEvent());
+ assertThat(actual.getType(), is(IngestDataChangeType.INSERT));
+ assertThat(actual.getColumnCount(), is(3));
+ }
+
+ private WriteRowEvent createWriteRowEvent() {
+ WriteRowEvent result = new WriteRowEvent();
+ result.setSchemaName("");
+ result.setTableName("t_order");
+ result.setAfterRow(Arrays.asList(101, 1, "OK"));
+ return result;
}
- private void assertWriteRowEvent0(final int expectedColumnCount) throws
ReflectiveOperationException {
- WriteRowEvent rowsEvent = new WriteRowEvent();
- rowsEvent.setSchemaName("");
- rowsEvent.setTableName("t_order");
- rowsEvent.setAfterRow(Arrays.asList(101, 1, "OK"));
+ private DataRecord getDataRecord(final WriteRowEvent rowsEvent) throws
ReflectiveOperationException {
Method method =
WALEventConverter.class.getDeclaredMethod("handleWriteRowEvent",
WriteRowEvent.class, PipelineTableMetaData.class);
- DataRecord actual = (DataRecord)
Plugins.getMemberAccessor().invoke(method, walEventConverter, rowsEvent,
pipelineTableMetaData);
- assertThat(actual.getType(), is(IngestDataChangeType.INSERT));
- assertThat(actual.getColumnCount(), is(expectedColumnCount));
+ return (DataRecord) Plugins.getMemberAccessor().invoke(method,
walEventConverter, rowsEvent, pipelineTableMetaData);
}
@Test