This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ef3ed41c552 Add PostgreSQLSlotManager (#32501)
ef3ed41c552 is described below
commit ef3ed41c552e46fc7aef74f26869965eef439a20
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Aug 14 13:05:35 2024 +0800
Add PostgreSQLSlotManager (#32501)
* Add PostgreSQLIngestPositionCreator
* Add PostgreSQLSlotManager
---
.../ingest/OpenGaussIngestPositionManager.java | 65 ++-----------
.../opengauss/ingest/OpenGaussWALDumper.java | 2 +-
.../ingest/PostgreSQLIngestPositionManager.java | 73 +++------------
.../postgresql/ingest/PostgreSQLWALDumper.java | 1 +
.../ingest/slot/PostgreSQLSlotManager.java | 103 +++++++++++++++++++++
.../{ => slot}/PostgreSQLSlotNameGenerator.java | 4 +-
.../postgresql/ingest/PostgreSQLWALDumperTest.java | 1 +
.../PostgreSQLSlotNameGeneratorTest.java | 2 +-
8 files changed, 127 insertions(+), 124 deletions(-)
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
index 1163e8ccc5b..614162289eb 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
@@ -17,11 +17,9 @@
package org.apache.shardingsphere.data.pipeline.opengauss.ingest;
-import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
import
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
-import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLSlotNameGenerator;
-import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.pojo.ReplicationSlotInfo;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.slot.PostgreSQLSlotManager;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import org.opengauss.replication.LogSequenceNumber;
@@ -30,19 +28,13 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Optional;
/**
* Ingest position manager for openGauss.
*/
-// TODO reuse PostgreSQLIngestPositionManager
-@Slf4j
public final class OpenGaussIngestPositionManager implements
DialectIngestPositionManager {
- private static final String DECODE_PLUGIN = "mppdb_decoding";
-
- private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
+ private final PostgreSQLSlotManager slotManager = new
PostgreSQLSlotManager("mppdb_decoding");
@Override
public WALPosition init(final String data) {
@@ -52,60 +44,15 @@ public final class OpenGaussIngestPositionManager
implements DialectIngestPositi
@Override
public WALPosition init(final DataSource dataSource, final String
slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
- createSlotIfNotExist(connection,
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix));
+ slotManager.create(connection, slotNameSuffix);
return getWALPosition(connection);
}
}
- private void createSlotIfNotExist(final Connection connection, final
String slotName) throws SQLException {
- Optional<ReplicationSlotInfo> slotInfo = getSlotInfo(connection,
slotName);
- if (!slotInfo.isPresent()) {
- createSlot(connection, slotName);
- return;
- }
- if (null == slotInfo.get().getDatabaseName()) {
- dropSlotIfExist(connection, slotName);
- createSlot(connection, slotName);
- }
- }
-
- private Optional<ReplicationSlotInfo> getSlotInfo(final Connection
connection, final String slotName) throws SQLException {
- String sql = "SELECT slot_name, database FROM pg_replication_slots
WHERE slot_name=? AND plugin=?";
- try (PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
- preparedStatement.setString(1, slotName);
- preparedStatement.setString(2, DECODE_PLUGIN);
- try (ResultSet resultSet = preparedStatement.executeQuery()) {
- return resultSet.next() ? Optional.of(new
ReplicationSlotInfo(resultSet.getString(1), resultSet.getString(2))) :
Optional.empty();
- }
- }
- }
-
- private void createSlot(final Connection connection, final String
slotName) throws SQLException {
- try (PreparedStatement preparedStatement =
connection.prepareStatement("SELECT * FROM
pg_create_logical_replication_slot(?, ?)")) {
- preparedStatement.setString(1, slotName);
- preparedStatement.setString(2, DECODE_PLUGIN);
- preparedStatement.execute();
- } catch (final SQLException ex) {
- if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
- throw ex;
- }
- }
- }
-
- private void dropSlotIfExist(final Connection connection, final String
slotName) throws SQLException {
- if (!getSlotInfo(connection, slotName).isPresent()) {
- return;
- }
- try (PreparedStatement preparedStatement =
connection.prepareStatement("SELECT pg_drop_replication_slot(?)")) {
- preparedStatement.setString(1, slotName);
- preparedStatement.execute();
- }
- }
-
private WALPosition getWALPosition(final Connection connection) throws
SQLException {
try (
- Statement statement = connection.createStatement();
- ResultSet resultSet = statement.executeQuery("SELECT
PG_CURRENT_XLOG_LOCATION()")) {
+ PreparedStatement preparedStatement =
connection.prepareStatement("SELECT PG_CURRENT_XLOG_LOCATION()");
+ ResultSet resultSet = preparedStatement.executeQuery()) {
resultSet.next();
return new WALPosition(new
OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1))));
}
@@ -114,7 +61,7 @@ public final class OpenGaussIngestPositionManager implements
DialectIngestPositi
@Override
public void destroy(final DataSource dataSource, final String
slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
- dropSlotIfExist(connection,
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix));
+ slotManager.dropIfExisted(connection, slotNameSuffix);
}
}
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index dd142ff8278..54918dc2c7b 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -32,7 +32,7 @@ import
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLog
import
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.MppdbDecodingPlugin;
import
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
import
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussTimestampUtils;
-import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLSlotNameGenerator;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.slot.PostgreSQLSlotNameGenerator;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALEventConverter;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
index 82e4c507719..b5fa2e8dbed 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
@@ -17,30 +17,26 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
-import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
-import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.pojo.ReplicationSlotInfo;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.slot.PostgreSQLSlotManager;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
import org.postgresql.replication.LogSequenceNumber;
import javax.sql.DataSource;
import java.sql.Connection;
+import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.Optional;
/**
* Ingest position manager for PostgreSQL.
*/
-@Slf4j
public final class PostgreSQLIngestPositionManager implements
DialectIngestPositionManager {
- private static final String DECODE_PLUGIN = "test_decoding";
-
- private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
+ private final PostgreSQLSlotManager slotManager = new
PostgreSQLSlotManager("test_decoding");
@Override
public WALPosition init(final String data) {
@@ -50,79 +46,34 @@ public final class PostgreSQLIngestPositionManager
implements DialectIngestPosit
@Override
public WALPosition init(final DataSource dataSource, final String
slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
- createSlotIfNotExist(connection,
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix));
- return getWALPosition(connection);
- }
- }
-
- private void createSlotIfNotExist(final Connection connection, final
String slotName) throws SQLException {
- Optional<ReplicationSlotInfo> slotInfo = getSlotInfo(connection,
slotName);
- if (!slotInfo.isPresent()) {
- createSlot(connection, slotName);
- return;
- }
- if (null == slotInfo.get().getDatabaseName()) {
- dropSlotIfExist(connection, slotName);
- createSlot(connection, slotName);
- }
- }
-
- private Optional<ReplicationSlotInfo> getSlotInfo(final Connection
connection, final String slotName) throws SQLException {
- String sql = "SELECT slot_name, database FROM pg_replication_slots
WHERE slot_name=? AND plugin=?";
- try (PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
- preparedStatement.setString(1, slotName);
- preparedStatement.setString(2, DECODE_PLUGIN);
- try (ResultSet resultSet = preparedStatement.executeQuery()) {
- return resultSet.next() ? Optional.of(new
ReplicationSlotInfo(resultSet.getString(1), resultSet.getString(2))) :
Optional.empty();
- }
- }
- }
-
- private void createSlot(final Connection connection, final String
slotName) throws SQLException {
- try (PreparedStatement preparedStatement =
connection.prepareStatement("SELECT * FROM
pg_create_logical_replication_slot(?, ?)")) {
- preparedStatement.setString(1, slotName);
- preparedStatement.setString(2, DECODE_PLUGIN);
- preparedStatement.execute();
- } catch (final SQLException ex) {
- if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
- throw ex;
- }
- }
- }
-
- private void dropSlotIfExist(final Connection connection, final String
slotName) throws SQLException {
- if (!getSlotInfo(connection, slotName).isPresent()) {
- return;
- }
- try (PreparedStatement preparedStatement =
connection.prepareStatement("SELECT pg_drop_replication_slot(?)")) {
- preparedStatement.setString(1, slotName);
- preparedStatement.execute();
+ slotManager.create(connection, slotNameSuffix);
+ return getWALPosition(connection,
getLogSequenceNumberSQL(connection.getMetaData()));
}
}
- private WALPosition getWALPosition(final Connection connection) throws
SQLException {
+ private WALPosition getWALPosition(final Connection connection, final
String logSequenceNumberSQL) throws SQLException {
try (
- PreparedStatement preparedStatement =
connection.prepareStatement(getLogSequenceNumberSQL(connection));
+ PreparedStatement preparedStatement =
connection.prepareStatement(logSequenceNumberSQL);
ResultSet resultSet = preparedStatement.executeQuery()) {
resultSet.next();
return new WALPosition(new
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1))));
}
}
- private String getLogSequenceNumberSQL(final Connection connection) throws
SQLException {
- if (9 == connection.getMetaData().getDatabaseMajorVersion() && 6 <=
connection.getMetaData().getDatabaseMinorVersion()) {
+ private String getLogSequenceNumberSQL(final DatabaseMetaData metaData)
throws SQLException {
+ if (9 == metaData.getDatabaseMajorVersion() && 6 <=
metaData.getDatabaseMinorVersion()) {
return "SELECT PG_CURRENT_XLOG_LOCATION()";
}
- if (10 <= connection.getMetaData().getDatabaseMajorVersion()) {
+ if (10 <= metaData.getDatabaseMajorVersion()) {
return "SELECT PG_CURRENT_WAL_LSN()";
}
- throw new PipelineInternalException("Unsupported PostgreSQL version: "
+ connection.getMetaData().getDatabaseProductVersion());
+ throw new PipelineInternalException("Unsupported PostgreSQL version: "
+ metaData.getDatabaseProductVersion());
}
@Override
public void destroy(final DataSource dataSource, final String
slotNameSuffix) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
- dropSlotIfExist(connection,
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix));
+ slotManager.dropIfExisted(connection, slotNameSuffix);
}
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index 5674040f903..7efb7e9d27a 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -28,6 +28,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPositi
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.slot.PostgreSQLSlotNameGenerator;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALEventConverter;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/slot/PostgreSQLSlotManager.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/slot/PostgreSQLSlotManager.java
new file mode 100644
index 00000000000..e0d08e82ee9
--- /dev/null
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/slot/PostgreSQLSlotManager.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.postgresql.ingest.slot;
+
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.pojo.ReplicationSlotInfo;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Optional;
+
+/**
+ * PostgreSQL slot manager.
+ */
+@RequiredArgsConstructor
+public final class PostgreSQLSlotManager {
+
+ private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
+
+ private final String decodePlugin;
+
+ /**
+ * Create slot.
+ *
+ * @param connection connection
+ * @param slotNameSuffix slot name suffix
+ * @throws SQLException SQL exception
+ */
+ public void create(final Connection connection, final String
slotNameSuffix) throws SQLException {
+ String slotName =
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix);
+ Optional<ReplicationSlotInfo> slotInfo = load(connection, slotName);
+ if (!slotInfo.isPresent()) {
+ doCreate(connection, slotName);
+ return;
+ }
+ if (null == slotInfo.get().getDatabaseName()) {
+ doDrop(connection, slotName);
+ doCreate(connection, slotName);
+ }
+ }
+
+ private Optional<ReplicationSlotInfo> load(final Connection connection,
final String slotName) throws SQLException {
+ String sql = "SELECT slot_name, database FROM pg_replication_slots
WHERE slot_name=? AND plugin=?";
+ try (PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ preparedStatement.setString(1, slotName);
+ preparedStatement.setString(2, decodePlugin);
+ try (ResultSet resultSet = preparedStatement.executeQuery()) {
+ return resultSet.next() ? Optional.of(new
ReplicationSlotInfo(resultSet.getString(1), resultSet.getString(2))) :
Optional.empty();
+ }
+ }
+ }
+
+ private void doCreate(final Connection connection, final String slotName)
throws SQLException {
+ try (PreparedStatement preparedStatement =
connection.prepareStatement("SELECT * FROM
pg_create_logical_replication_slot(?, ?)")) {
+ preparedStatement.setString(1, slotName);
+ preparedStatement.setString(2, decodePlugin);
+ preparedStatement.execute();
+ } catch (final SQLException ex) {
+ if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
+ throw ex;
+ }
+ }
+ }
+
+ /**
+ * Drop slot if existed.
+ *
+ * @param connection connection
+ * @param slotNameSuffix slot name suffix
+ * @throws SQLException SQL exception
+ */
+ public void dropIfExisted(final Connection connection, final String
slotNameSuffix) throws SQLException {
+ String slotName =
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix);
+ if (!load(connection, slotName).isPresent()) {
+ return;
+ }
+ doDrop(connection, slotName);
+ }
+
+ private void doDrop(final Connection connection, final String slotName)
throws SQLException {
+ try (PreparedStatement preparedStatement =
connection.prepareStatement("SELECT pg_drop_replication_slot(?)")) {
+ preparedStatement.setString(1, slotName);
+ preparedStatement.execute();
+ }
+ }
+}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLSlotNameGenerator.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/slot/PostgreSQLSlotNameGenerator.java
similarity index 97%
rename from
kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLSlotNameGenerator.java
rename to
kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/slot/PostgreSQLSlotNameGenerator.java
index ab84f236d69..8d9f64bd64e 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLSlotNameGenerator.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/slot/PostgreSQLSlotNameGenerator.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
+package org.apache.shardingsphere.data.pipeline.postgresql.ingest.slot;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@@ -37,7 +37,7 @@ public final class PostgreSQLSlotNameGenerator {
*
* @param connection connection
* @param slotNameSuffix slot name suffix
- * @return unique name by connection
+ * @return unique slot name
* @throws SQLException failed when get catalog
*/
public static String getUniqueSlotName(final Connection connection, final
String slotNameSuffix) throws SQLException {
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index 9187cb58d9f..1978c634098 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -26,6 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.In
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.ActualAndLogicTableNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.slot.PostgreSQLSlotNameGenerator;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLSlotNameGeneratorTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/slot/PostgreSQLSlotNameGeneratorTest.java
similarity index 99%
rename from
kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLSlotNameGeneratorTest.java
rename to
kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/slot/PostgreSQLSlotNameGeneratorTest.java
index 7db70a30e6f..5b3acbf9184 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLSlotNameGeneratorTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/slot/PostgreSQLSlotNameGeneratorTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
+package org.apache.shardingsphere.data.pipeline.postgresql.ingest.slot;
import org.junit.jupiter.api.Test;