This is an automated email from the ASF dual-hosted git repository.

azexin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 607ba1138f6 Pipeline E2E compatible with 
com.mysql:mysql-connector-j:8.0 (#28853)
607ba1138f6 is described below

commit 607ba1138f625315fe0ec760b8ad772f5659fb0b
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Oct 24 17:55:40 2023 +0800

    Pipeline E2E compatible with com.mysql:mysql-connector-j:8.0 (#28853)
    
    * Pipeline E2E use statement.execute and getResultSet to compatible with 
com.mysql:mysql-connector-j:8.0 for extended SQL
    
    * Revert pipeline E2E compatibility code of com.mysql:mysql-connector-j:8.0
    
    * Disable CDCE2EIT for now
---
 .../pipeline/cases/PipelineContainerComposer.java  |  9 ++++++---
 .../general/MySQLMigrationGeneralE2EIT.java        | 11 ++++++++---
 .../primarykey/IndexesMigrationE2EIT.java          | 23 ++++++++++++++++------
 3 files changed, 31 insertions(+), 12 deletions(-)

diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index 7095b587f94..fa86987e489 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -61,6 +61,7 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -420,9 +421,11 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
      */
     public List<Map<String, Object>> queryForListWithLog(final DataSource 
dataSource, final String sql) {
         log.info("Query SQL: {}", sql);
-        try (Connection connection = dataSource.getConnection()) {
-            ResultSet resultSet = 
connection.createStatement().executeQuery(sql);
-            return transformResultSetToList(resultSet);
+        try (
+                Connection connection = dataSource.getConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute(sql);
+            return transformResultSetToList(statement.getResultSet());
         } catch (final SQLException ex) {
             throw new RuntimeException(ex);
         }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 07317d6d5a3..e48d3d4c572 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -38,6 +38,7 @@ import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 
+import javax.sql.DataSource;
 import java.sql.SQLException;
 import java.time.LocalDateTime;
 import java.util.List;
@@ -82,14 +83,17 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             containerComposer.startIncrementTask(
                     new 
E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME, 
new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
             
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30);
+            containerComposer.sourceExecuteWithLog(String.format("INSERT INTO 
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", SOURCE_TABLE_NAME));
+            containerComposer.sourceExecuteWithLog("INSERT INTO t_order_item 
(item_id, order_id, user_id, status) VALUES (10000, 10000, 1, 'OK')");
             stopMigrationByJobId(containerComposer, orderJobId);
             startMigrationByJobId(containerComposer, orderJobId);
-            containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", orderJobId));
-            String orderItemJobId = getJobIdByTableName(containerComposer, 
"ds_0.t_order_item");
-            containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", orderItemJobId));
+            DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
+            containerComposer.assertOrderRecordExist(jdbcDataSource, 
"t_order", 10000);
+            containerComposer.assertOrderRecordExist(jdbcDataSource, 
"t_order_item", 10000);
             Properties algorithmProps = new Properties();
             algorithmProps.setProperty("chunk-size", "300");
             assertMigrationSuccessById(containerComposer, orderJobId, 
"DATA_MATCH", algorithmProps);
+            String orderItemJobId = getJobIdByTableName(containerComposer, 
"ds_0.t_order_item");
             assertMigrationSuccessById(containerComposer, orderItemJobId, 
"DATA_MATCH", algorithmProps);
             Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() -> 
true);
             assertMigrationSuccessById(containerComposer, orderItemJobId, 
"CRC32_MATCH", new Properties());
@@ -97,6 +101,7 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
                 commitMigrationByJobId(containerComposer, each);
             }
             assertTrue(listJobId(containerComposer).isEmpty());
+            
containerComposer.assertGreaterThanOrderTableInitRows(jdbcDataSource, 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT, "");
         }
     }
     
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 8c2b40dd2e1..d4b01eee705 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -18,6 +18,7 @@
 package 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primarykey;
 
 import lombok.SneakyThrows;
+import org.apache.commons.codec.binary.Hex;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
@@ -39,6 +40,7 @@ import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ArgumentsSource;
 
+import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
@@ -90,12 +92,13 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             }
             KeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
             // TODO PostgreSQL update delete events not support if table 
without unique keys at increment task.
-            final Consumer<Void> incrementalTaskFn = unused -> {
+            final Consumer<DataSource> incrementalTaskFn = dataSource -> {
                 if (containerComposer.getDatabaseType() instanceof 
MySQLDatabaseType) {
                     doCreateUpdateDelete(containerComposer, 
keyGenerateAlgorithm.generateKey());
                 }
                 Object orderId = keyGenerateAlgorithm.generateKey();
                 insertOneOrder(containerComposer, orderId);
+                containerComposer.assertOrderRecordExist(dataSource, 
"t_order", orderId);
             };
             assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, incrementalTaskFn);
         }
@@ -169,9 +172,10 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT 
{
             }
             KeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
             Object uniqueKey = keyGenerateAlgorithm.generateKey();
-            assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, unused -> {
+            assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
                 insertOneOrder(containerComposer, uniqueKey);
                 doCreateUpdateDelete(containerComposer, 
keyGenerateAlgorithm.generateKey());
+                containerComposer.assertOrderRecordExist(dataSource, 
"t_order", uniqueKey);
             });
         }
     }
@@ -191,9 +195,10 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT 
{
             }
             KeyGenerateAlgorithm keyGenerateAlgorithm = new 
AutoIncrementKeyGenerateAlgorithm();
             Object uniqueKey = keyGenerateAlgorithm.generateKey();
-            assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, unused -> {
+            assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
                 insertOneOrder(containerComposer, uniqueKey);
                 doCreateUpdateDelete(containerComposer, 
keyGenerateAlgorithm.generateKey());
+                containerComposer.assertOrderRecordExist(dataSource, 
"t_order", uniqueKey);
             });
         }
     }
@@ -215,12 +220,16 @@ class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
             KeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
             // TODO Insert binary string in VARBINARY column. But 
KeyGenerateAlgorithm.generateKey() require returning Comparable, and byte[] is 
not Comparable
             byte[] uniqueKey = new byte[]{-1, 0, 1};
-            assertMigrationSuccess(containerComposer, sql, "order_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, unused -> 
insertOneOrder(containerComposer, uniqueKey));
+            assertMigrationSuccess(containerComposer, sql, "order_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
+                insertOneOrder(containerComposer, uniqueKey);
+                // TODO Select by byte[] from proxy doesn't work, so unhex 
function is used for now
+                containerComposer.assertOrderRecordExist(dataSource, 
String.format("SELECT 1 FROM t_order WHERE order_id=UNHEX('%s')", 
Hex.encodeHexString(uniqueKey)));
+            });
         }
     }
     
     private void assertMigrationSuccess(final PipelineContainerComposer 
containerComposer, final String sqlPattern, final String shardingColumn, final 
KeyGenerateAlgorithm keyGenerateAlgorithm,
-                                        final String 
consistencyCheckAlgorithmType, final Consumer<Void> incrementalTaskFn) throws 
Exception {
+                                        final String 
consistencyCheckAlgorithmType, final Consumer<DataSource> incrementalTaskFn) 
throws Exception {
         containerComposer.sourceExecuteWithLog(String.format(sqlPattern, 
SOURCE_TABLE_NAME));
         try (Connection connection = 
containerComposer.getSourceDataSource().getConnection()) {
             
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, 
keyGenerateAlgorithm, SOURCE_TABLE_NAME, 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
@@ -232,12 +241,14 @@ class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
         startMigration(containerComposer, SOURCE_TABLE_NAME, 
TARGET_TABLE_NAME);
         String jobId = listJobId(containerComposer).get(0);
         containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION 
STATUS '%s'", jobId));
-        incrementalTaskFn.accept(null);
+        DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
+        incrementalTaskFn.accept(jdbcDataSource);
         containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
         if (null != consistencyCheckAlgorithmType) {
             assertCheckMigrationSuccess(containerComposer, jobId, 
consistencyCheckAlgorithmType);
         }
         commitMigrationByJobId(containerComposer, jobId);
+        
assertThat(containerComposer.getTargetTableRecordsCount(jdbcDataSource, 
SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
         List<String> lastJobIds = listJobId(containerComposer);
         assertTrue(lastJobIds.isEmpty());
     }

Reply via email to