This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4820b3c3a76 Update pipeline E2E code to fit Java 17 (#37906)
4820b3c3a76 is described below
commit 4820b3c3a76cfae292e5b21adbf0412c35173cf4
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Jan 30 19:13:52 2026 +0800
Update pipeline E2E code to fit Java 17 (#37906)
* Replace collect(Collectors.toList()) to toList()
* Replace to text block
* Replace to enhanced switch
* Replace to String.repeat
* Replace deprecated commons-lang3 RandomUtils
---
.../pipeline/cases/PipelineContainerComposer.java | 3 +-
.../cases/cdc/DataSourceRecordConsumer.java | 46 ++++++++++------------
.../general/PostgreSQLToMySQLMigrationE2EIT.java | 10 ++---
.../primarykey/IndexesMigrationE2EIT.java | 12 +++---
.../framework/param/PipelineE2ECondition.java | 3 +-
.../util/AutoIncrementKeyGenerateAlgorithm.java | 3 +-
.../pipeline/util/PipelineE2EDistSQLFacade.java | 16 ++++----
.../operation/pipeline/util/SQLBuilderUtils.java | 4 +-
8 files changed, 46 insertions(+), 51 deletions(-)
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
index e571ebfb887..f2578a2d04b 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
@@ -71,7 +71,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -259,7 +258,7 @@ public final class PipelineContainerComposer implements
AutoCloseable {
* @return storage units names
*/
public List<String> showStorageUnitsName() {
- List<String> result = queryForListWithLog(proxyDataSource, "SHOW
STORAGE UNITS").stream().map(each ->
String.valueOf(each.get("name"))).collect(Collectors.toList());
+ List<String> result = queryForListWithLog(proxyDataSource, "SHOW
STORAGE UNITS").stream().map(each -> String.valueOf(each.get("name"))).toList();
log.info("Show storage units name: {}", result);
return result;
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/DataSourceRecordConsumer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/DataSourceRecordConsumer.java
index e479cce1002..7eab156d7db 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/DataSourceRecordConsumer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/DataSourceRecordConsumer.java
@@ -46,7 +46,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
-import java.util.stream.Collectors;
@Slf4j
public final class DataSourceRecordConsumer implements Consumer<List<Record>> {
@@ -97,7 +96,7 @@ public final class DataSourceRecordConsumer implements
Consumer<List<Record>> {
Record firstRecord = records.get(0);
MetaData metaData = firstRecord.getMetaData();
PipelineTableMetaData tableMetaData =
loadTableMetaData(metaData.getSchema(), metaData.getTable());
- List<String> columnNames =
firstRecord.getAfterList().stream().map(TableColumn::getName).collect(Collectors.toList());
+ List<String> columnNames =
firstRecord.getAfterList().stream().map(TableColumn::getName).toList();
String tableName = buildTableNameWithSchema(metaData.getSchema(),
metaData.getTable());
String insertSQL = SQLBuilderUtils.buildInsertSQL(columnNames,
tableName);
try (PreparedStatement preparedStatement =
connection.prepareStatement(insertSQL)) {
@@ -127,7 +126,7 @@ public final class DataSourceRecordConsumer implements
Consumer<List<Record>> {
updateCount = preparedStatement.executeUpdate();
if (1 != updateCount || printSQL) {
log.info("Execute insert, update count: {}, sql: {},
values: {}", updateCount, sql,
-
ingestedRecord.getAfterList().stream().map(each ->
convertValueFromAny(tableMetaData, each)).collect(Collectors.toList()));
+
ingestedRecord.getAfterList().stream().map(each ->
convertValueFromAny(tableMetaData, each)).toList());
}
break;
case UPDATE:
@@ -139,7 +138,7 @@ public final class DataSourceRecordConsumer implements
Consumer<List<Record>> {
updateCount = preparedStatement.executeUpdate();
if (1 != updateCount || printSQL) {
log.info("Execute update, update count: {}, sql: {},
values: {}", updateCount, sql,
-
ingestedRecord.getAfterList().stream().map(each ->
convertValueFromAny(tableMetaData, each)).collect(Collectors.toList()));
+
ingestedRecord.getAfterList().stream().map(each ->
convertValueFromAny(tableMetaData, each)).toList());
}
break;
case DELETE:
@@ -170,19 +169,15 @@ public final class DataSourceRecordConsumer implements
Consumer<List<Record>> {
}
private String buildSQL(final Record ingestedRecord) {
- List<String> columnNames =
ingestedRecord.getAfterList().stream().map(TableColumn::getName).collect(Collectors.toList());
+ List<String> columnNames =
ingestedRecord.getAfterList().stream().map(TableColumn::getName).toList();
MetaData metaData = ingestedRecord.getMetaData();
String tableName = buildTableNameWithSchema(metaData.getSchema(),
metaData.getTable());
- switch (ingestedRecord.getDataChangeType()) {
- case INSERT:
- return SQLBuilderUtils.buildInsertSQL(columnNames, tableName);
- case UPDATE:
- return SQLBuilderUtils.buildUpdateSQL(columnNames, tableName,
"?");
- case DELETE:
- return SQLBuilderUtils.buildDeleteSQL(tableName, "order_id");
- default:
- throw new UnsupportedOperationException("");
- }
+ return switch (ingestedRecord.getDataChangeType()) {
+ case INSERT -> SQLBuilderUtils.buildInsertSQL(columnNames,
tableName);
+ case UPDATE -> SQLBuilderUtils.buildUpdateSQL(columnNames,
tableName, "?");
+ case DELETE -> SQLBuilderUtils.buildDeleteSQL(tableName,
"order_id");
+ default -> throw new UnsupportedOperationException("");
+ };
}
private TableColumn getOrderIdTableColumn(final List<TableColumn>
tableColumns) {
@@ -202,21 +197,22 @@ public final class DataSourceRecordConsumer implements
Consumer<List<Record>> {
if (null == result) {
return null;
}
- switch (columnMetaData.getDataType()) {
- case Types.TIME:
+ return switch (columnMetaData.getDataType()) {
+ case Types.TIME -> {
if ("TIME".equalsIgnoreCase(columnMetaData.getDataTypeName()))
{
// Time.valueOf() will lose nanos
- return LocalTime.ofNanoOfDay((Long) result);
+ yield LocalTime.ofNanoOfDay((Long) result);
}
- return result;
- case Types.DATE:
+ yield result;
+ }
+ case Types.DATE -> {
if ("DATE".equalsIgnoreCase(columnMetaData.getDataTypeName()))
{
LocalDate localDate = LocalDate.ofEpochDay((Long) result);
- return Date.valueOf(localDate);
+ yield Date.valueOf(localDate);
}
- return result;
- default:
- return result;
- }
+ yield result;
+ }
+ default -> result;
+ };
}
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
index a965614d3ba..c477b1e27b7 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
@@ -19,7 +19,6 @@ package
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.migration.ge
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.commons.lang3.RandomUtils;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.test.e2e.env.runtime.E2ETestEnvironment;
import org.apache.shardingsphere.test.e2e.env.runtime.type.RunEnvironment.Type;
@@ -48,6 +47,7 @@ import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -115,7 +115,7 @@ class PostgreSQLToMySQLMigrationE2EIT extends
AbstractMigrationE2EIT {
preparedStatement.setObject(5, LocalDate.now());
preparedStatement.setObject(6, LocalTime.now().withNano(0));
preparedStatement.setObject(7, new byte[]{1, 2, 3, 4, 5});
- preparedStatement.setObject(8, new BigDecimal(i *
RandomUtils.nextInt(1, 100) + ".22"));
+ preparedStatement.setObject(8, new BigDecimal(i *
ThreadLocalRandom.current().nextInt(1, 100) + ".22"));
preparedStatement.addBatch();
}
preparedStatement.executeBatch();
@@ -140,7 +140,7 @@ class PostgreSQLToMySQLMigrationE2EIT extends
AbstractMigrationE2EIT {
try (Connection connection =
containerComposer.getProxyDataSource().getConnection()) {
connection.createStatement().execute("CREATE TABLE IF NOT EXISTS
t_order (order_id BIGINT PRIMARY KEY,user_id INT,status VARCHAR(32), c_datetime
DATETIME(6),c_date DATE,c_time TIME,"
+ "c_bytea BLOB,c_decimal DECIMAL(10,2))");
- if (waitForTableExistence(connection, "t_order")) {
+ if (waitForTableExistence(connection)) {
connection.createStatement().execute("TRUNCATE TABLE t_order");
} else {
throw new SQLException("Table t_order does not exist");
@@ -148,9 +148,9 @@ class PostgreSQLToMySQLMigrationE2EIT extends
AbstractMigrationE2EIT {
}
}
- private static boolean waitForTableExistence(final Connection connection,
final String tableName) {
+ private static boolean waitForTableExistence(final Connection connection) {
try {
- Awaitility.waitAtMost(60L,
TimeUnit.SECONDS).ignoreExceptions().pollInterval(3L,
TimeUnit.SECONDS).until(() -> tableExists(connection, tableName));
+ Awaitility.waitAtMost(60L,
TimeUnit.SECONDS).ignoreExceptions().pollInterval(3L,
TimeUnit.SECONDS).until(() -> tableExists(connection, "t_order"));
return true;
} catch (final ConditionTimeoutException ex) {
return false;
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 4fbb466b515..40d6940076e 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -64,11 +64,13 @@ import static org.mockito.Mockito.mock;
@PipelineE2EDatabaseSettings(type = "PostgreSQL")})
class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
- private static final String ORDER_TABLE_SHARDING_RULE_FORMAT = "CREATE
SHARDING TABLE RULE t_order(\n"
- + "STORAGE_UNITS(ds_2,ds_3,ds_4),\n"
- + "SHARDING_COLUMN=%s,\n"
- + "TYPE(NAME=\"hash_mod\",PROPERTIES(\"sharding-count\"=\"6\"))\n"
- + ");";
+ private static final String ORDER_TABLE_SHARDING_RULE_FORMAT = """
+ CREATE SHARDING TABLE RULE t_order(
+ STORAGE_UNITS(ds_2,ds_3,ds_4),
+ SHARDING_COLUMN=%s,
+ TYPE(NAME="hash_mod",PROPERTIES("sharding-count"="6"))
+ )
+ """;
private static final String SOURCE_TABLE_NAME = "t_order";
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/framework/param/PipelineE2ECondition.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/framework/param/PipelineE2ECondition.java
index 7928c54e840..2fcd841d96e 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/framework/param/PipelineE2ECondition.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/framework/param/PipelineE2ECondition.java
@@ -27,7 +27,6 @@ import org.junit.jupiter.api.extension.ExtensionContext;
import java.util.Arrays;
import java.util.Collection;
-import java.util.stream.Collectors;
/**
* Pipeline E2E condition.
@@ -46,7 +45,7 @@ public final class PipelineE2ECondition {
return false;
}
PipelineE2ESettings settings =
context.getRequiredTestClass().getAnnotation(PipelineE2ESettings.class);
- Collection<DatabaseType> databaseTypes =
Arrays.stream(settings.database()).map(each ->
TypedSPILoader.getService(DatabaseType.class,
each.type())).collect(Collectors.toList());
+ Collection<DatabaseType> databaseTypes =
Arrays.stream(settings.database()).map(each ->
TypedSPILoader.getService(DatabaseType.class, each.type())).toList();
if (!databaseTypes.isEmpty() && Type.NATIVE ==
E2ETestEnvironment.getInstance().getRunEnvironment().getType()) {
return true;
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
index 0b687ca6471..a1ba5e07a6c 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/AutoIncrementKeyGenerateAlgorithm.java
@@ -22,7 +22,6 @@ import
org.apache.shardingsphere.infra.algorithm.keygen.spi.KeyGenerateAlgorithm
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
import java.util.stream.IntStream;
public final class AutoIncrementKeyGenerateAlgorithm implements
KeyGenerateAlgorithm {
@@ -31,7 +30,7 @@ public final class AutoIncrementKeyGenerateAlgorithm
implements KeyGenerateAlgor
@Override
public Collection<Integer> generateKeys(final AlgorithmSQLContext context,
final int keyGenerateCount) {
- return IntStream.range(0, keyGenerateCount).mapToObj(each ->
idGen.getAndIncrement()).collect(Collectors.toList());
+ return IntStream.range(0, keyGenerateCount).mapToObj(each ->
idGen.getAndIncrement()).toList();
}
@Override
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
index b748f3236ab..dbbf09a463d 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
@@ -47,10 +47,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Slf4j
public final class PipelineE2EDistSQLFacade {
- private static final String PIPELINE_RULE_SQL_TEMPLATE = "ALTER %s RULE(\n"
- + "READ(WORKER_THREAD=20, BATCH_SIZE=1000, SHARDING_SIZE=100000,
RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))),\n"
- + "WRITE(WORKER_THREAD=20, BATCH_SIZE=1000, RATE_LIMITER
(TYPE(NAME='TPS',PROPERTIES('tps'='2000')))),\n"
- + "STREAM_CHANNEL(TYPE(NAME='MEMORY',
PROPERTIES('block-queue-size'=1000))))";
+ private static final String PIPELINE_RULE_SQL_TEMPLATE = """
+ ALTER %s RULE(
+ READ(WORKER_THREAD=20, BATCH_SIZE=1000, SHARDING_SIZE=100000,
RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))),
+ WRITE(WORKER_THREAD=20, BATCH_SIZE=1000, RATE_LIMITER
(TYPE(NAME='TPS',PROPERTIES('tps'='2000')))),
+ STREAM_CHANNEL(TYPE(NAME='MEMORY',
PROPERTIES('block-queue-size'=1000))))
+ """;
private final PipelineContainerComposer containerComposer;
@@ -112,7 +114,7 @@ public final class PipelineE2EDistSQLFacade {
* @return job ids
*/
public List<String> listJobIds() {
- return listJobs().stream().map(a ->
a.get("id").toString()).collect(Collectors.toList());
+ return listJobs().stream().map(a -> a.get("id").toString()).toList();
}
/**
@@ -219,7 +221,7 @@ public final class PipelineE2EDistSQLFacade {
for (int i = 0; i < 10; i++) {
List<Map<String, Object>> jobStatusRecords =
containerComposer.queryForListWithLog(sql);
log.info("Wait job Incremental stage started, job status records:
{}", jobStatusRecords);
- List<String> statusList = jobStatusRecords.stream().map(each ->
String.valueOf(each.get("status"))).collect(Collectors.toList());
+ List<String> statusList = jobStatusRecords.stream().map(each ->
String.valueOf(each.get("status"))).toList();
if (statusList.stream().allMatch(each ->
each.equals(jobStatus.name()))) {
return;
}
@@ -309,7 +311,7 @@ public final class PipelineE2EDistSQLFacade {
containerComposer.sleepSeconds(3);
continue;
}
- List<String> checkEndTimeList =
checkStatusRecords.stream().map(map ->
map.get("check_end_time").toString()).filter(each ->
!Strings.isNullOrEmpty(each)).collect(Collectors.toList());
+ List<String> checkEndTimeList =
checkStatusRecords.stream().map(map ->
map.get("check_end_time").toString()).filter(each ->
!Strings.isNullOrEmpty(each)).toList();
if (checkEndTimeList.size() == checkStatusRecords.size()) {
break;
} else {
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/SQLBuilderUtils.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/SQLBuilderUtils.java
index c52bd0105ed..77d825dd4e0 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/SQLBuilderUtils.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/SQLBuilderUtils.java
@@ -42,9 +42,7 @@ public final class SQLBuilderUtils {
}
result.setLength(result.length() - 1);
result.append(") ").append("VALUES").append("(");
- for (int i = 0; i < columnNames.size(); i++) {
- result.append("?,");
- }
+ result.append("?,".repeat(columnNames.size()));
result.setLength(result.length() - 1);
result.append(")");
return String.format(result.toString(), tableName);