This is an automated email from the ASF dual-hosted git repository.
azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 607ba1138f6 Pipeline E2E compatible with
com.mysql:mysql-connector-j:8.0 (#28853)
607ba1138f6 is described below
commit 607ba1138f625315fe0ec760b8ad772f5659fb0b
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Oct 24 17:55:40 2023 +0800
Pipeline E2E compatible with com.mysql:mysql-connector-j:8.0 (#28853)
* Pipeline E2E use statement.execute and getResultSet to compatible with
com.mysql:mysql-connector-j:8.0 for extended SQL
* Revert pipeline E2E compatibility code of com.mysql:mysql-connector-j:8.0
* Disable CDCE2EIT for now
---
.../pipeline/cases/PipelineContainerComposer.java | 9 ++++++---
.../general/MySQLMigrationGeneralE2EIT.java | 11 ++++++++---
.../primarykey/IndexesMigrationE2EIT.java | 23 ++++++++++++++++------
3 files changed, 31 insertions(+), 12 deletions(-)
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 7095b587f94..fa86987e489 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -61,6 +61,7 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -420,9 +421,11 @@ public final class PipelineContainerComposer implements
AutoCloseable {
*/
public List<Map<String, Object>> queryForListWithLog(final DataSource
dataSource, final String sql) {
log.info("Query SQL: {}", sql);
- try (Connection connection = dataSource.getConnection()) {
- ResultSet resultSet =
connection.createStatement().executeQuery(sql);
- return transformResultSetToList(resultSet);
+ try (
+ Connection connection = dataSource.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ return transformResultSetToList(statement.getResultSet());
} catch (final SQLException ex) {
throw new RuntimeException(ex);
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 07317d6d5a3..e48d3d4c572 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -38,6 +38,7 @@ import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
+import javax.sql.DataSource;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.List;
@@ -82,14 +83,17 @@ class MySQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
containerComposer.startIncrementTask(
new
E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME,
new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
+ containerComposer.sourceExecuteWithLog(String.format("INSERT INTO
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", SOURCE_TABLE_NAME));
+ containerComposer.sourceExecuteWithLog("INSERT INTO t_order_item
(item_id, order_id, user_id, status) VALUES (10000, 10000, 1, 'OK')");
stopMigrationByJobId(containerComposer, orderJobId);
startMigrationByJobId(containerComposer, orderJobId);
- containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", orderJobId));
- String orderItemJobId = getJobIdByTableName(containerComposer,
"ds_0.t_order_item");
- containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", orderItemJobId));
+ DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
+ containerComposer.assertOrderRecordExist(jdbcDataSource,
"t_order", 10000);
+ containerComposer.assertOrderRecordExist(jdbcDataSource,
"t_order_item", 10000);
Properties algorithmProps = new Properties();
algorithmProps.setProperty("chunk-size", "300");
assertMigrationSuccessById(containerComposer, orderJobId,
"DATA_MATCH", algorithmProps);
+ String orderItemJobId = getJobIdByTableName(containerComposer,
"ds_0.t_order_item");
assertMigrationSuccessById(containerComposer, orderItemJobId,
"DATA_MATCH", algorithmProps);
Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() ->
true);
assertMigrationSuccessById(containerComposer, orderItemJobId,
"CRC32_MATCH", new Properties());
@@ -97,6 +101,7 @@ class MySQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
commitMigrationByJobId(containerComposer, each);
}
assertTrue(listJobId(containerComposer).isEmpty());
+
containerComposer.assertGreaterThanOrderTableInitRows(jdbcDataSource,
PipelineContainerComposer.TABLE_INIT_ROW_COUNT, "");
}
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 8c2b40dd2e1..d4b01eee705 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -18,6 +18,7 @@
package
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primarykey;
import lombok.SneakyThrows;
+import org.apache.commons.codec.binary.Hex;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
@@ -39,6 +40,7 @@ import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
+import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
@@ -90,12 +92,13 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
}
KeyGenerateAlgorithm keyGenerateAlgorithm = new
UUIDKeyGenerateAlgorithm();
// TODO PostgreSQL update delete events not support if table
without unique keys at increment task.
- final Consumer<Void> incrementalTaskFn = unused -> {
+ final Consumer<DataSource> incrementalTaskFn = dataSource -> {
if (containerComposer.getDatabaseType() instanceof
MySQLDatabaseType) {
doCreateUpdateDelete(containerComposer,
keyGenerateAlgorithm.generateKey());
}
Object orderId = keyGenerateAlgorithm.generateKey();
insertOneOrder(containerComposer, orderId);
+ containerComposer.assertOrderRecordExist(dataSource,
"t_order", orderId);
};
assertMigrationSuccess(containerComposer, sql, "user_id",
keyGenerateAlgorithm, consistencyCheckAlgorithmType, incrementalTaskFn);
}
@@ -169,9 +172,10 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT
{
}
KeyGenerateAlgorithm keyGenerateAlgorithm = new
UUIDKeyGenerateAlgorithm();
Object uniqueKey = keyGenerateAlgorithm.generateKey();
- assertMigrationSuccess(containerComposer, sql, "user_id",
keyGenerateAlgorithm, consistencyCheckAlgorithmType, unused -> {
+ assertMigrationSuccess(containerComposer, sql, "user_id",
keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
insertOneOrder(containerComposer, uniqueKey);
doCreateUpdateDelete(containerComposer,
keyGenerateAlgorithm.generateKey());
+ containerComposer.assertOrderRecordExist(dataSource,
"t_order", uniqueKey);
});
}
}
@@ -191,9 +195,10 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT
{
}
KeyGenerateAlgorithm keyGenerateAlgorithm = new
AutoIncrementKeyGenerateAlgorithm();
Object uniqueKey = keyGenerateAlgorithm.generateKey();
- assertMigrationSuccess(containerComposer, sql, "user_id",
keyGenerateAlgorithm, consistencyCheckAlgorithmType, unused -> {
+ assertMigrationSuccess(containerComposer, sql, "user_id",
keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
insertOneOrder(containerComposer, uniqueKey);
doCreateUpdateDelete(containerComposer,
keyGenerateAlgorithm.generateKey());
+ containerComposer.assertOrderRecordExist(dataSource,
"t_order", uniqueKey);
});
}
}
@@ -215,12 +220,16 @@ class IndexesMigrationE2EIT extends
AbstractMigrationE2EIT {
KeyGenerateAlgorithm keyGenerateAlgorithm = new
UUIDKeyGenerateAlgorithm();
// TODO Insert binary string in VARBINARY column. But
KeyGenerateAlgorithm.generateKey() require returning Comparable, and byte[] is
not Comparable
byte[] uniqueKey = new byte[]{-1, 0, 1};
- assertMigrationSuccess(containerComposer, sql, "order_id",
keyGenerateAlgorithm, consistencyCheckAlgorithmType, unused ->
insertOneOrder(containerComposer, uniqueKey));
+ assertMigrationSuccess(containerComposer, sql, "order_id",
keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
+ insertOneOrder(containerComposer, uniqueKey);
+ // TODO Select by byte[] from proxy doesn't work, so unhex
function is used for now
+ containerComposer.assertOrderRecordExist(dataSource,
String.format("SELECT 1 FROM t_order WHERE order_id=UNHEX('%s')",
Hex.encodeHexString(uniqueKey)));
+ });
}
}
private void assertMigrationSuccess(final PipelineContainerComposer
containerComposer, final String sqlPattern, final String shardingColumn, final
KeyGenerateAlgorithm keyGenerateAlgorithm,
- final String
consistencyCheckAlgorithmType, final Consumer<Void> incrementalTaskFn) throws
Exception {
+ final String
consistencyCheckAlgorithmType, final Consumer<DataSource> incrementalTaskFn)
throws Exception {
containerComposer.sourceExecuteWithLog(String.format(sqlPattern,
SOURCE_TABLE_NAME));
try (Connection connection =
containerComposer.getSourceDataSource().getConnection()) {
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection,
keyGenerateAlgorithm, SOURCE_TABLE_NAME,
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
@@ -232,12 +241,14 @@ class IndexesMigrationE2EIT extends
AbstractMigrationE2EIT {
startMigration(containerComposer, SOURCE_TABLE_NAME,
TARGET_TABLE_NAME);
String jobId = listJobId(containerComposer).get(0);
containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION
STATUS '%s'", jobId));
- incrementalTaskFn.accept(null);
+ DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
+ incrementalTaskFn.accept(jdbcDataSource);
containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
if (null != consistencyCheckAlgorithmType) {
assertCheckMigrationSuccess(containerComposer, jobId,
consistencyCheckAlgorithmType);
}
commitMigrationByJobId(containerComposer, jobId);
+
assertThat(containerComposer.getTargetTableRecordsCount(jdbcDataSource,
SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
List<String> lastJobIds = listJobId(containerComposer);
assertTrue(lastJobIds.isEmpty());
}