This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ef3ed41c552 Add PostgreSQLSlotManager (#32501)
ef3ed41c552 is described below

commit ef3ed41c552e46fc7aef74f26869965eef439a20
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Aug 14 13:05:35 2024 +0800

    Add PostgreSQLSlotManager (#32501)
    
    * Add PostgreSQLIngestPositionCreator
    
    * Add PostgreSQLSlotManager
---
 .../ingest/OpenGaussIngestPositionManager.java     |  65 ++-----------
 .../opengauss/ingest/OpenGaussWALDumper.java       |   2 +-
 .../ingest/PostgreSQLIngestPositionManager.java    |  73 +++------------
 .../postgresql/ingest/PostgreSQLWALDumper.java     |   1 +
 .../ingest/slot/PostgreSQLSlotManager.java         | 103 +++++++++++++++++++++
 .../{ => slot}/PostgreSQLSlotNameGenerator.java    |   4 +-
 .../postgresql/ingest/PostgreSQLWALDumperTest.java |   1 +
 .../PostgreSQLSlotNameGeneratorTest.java           |   2 +-
 8 files changed, 127 insertions(+), 124 deletions(-)

diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
index 1163e8ccc5b..614162289eb 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
@@ -17,11 +17,9 @@
 
 package org.apache.shardingsphere.data.pipeline.opengauss.ingest;
 
-import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
 import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
-import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLSlotNameGenerator;
-import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.pojo.ReplicationSlotInfo;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.slot.PostgreSQLSlotManager;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import org.opengauss.replication.LogSequenceNumber;
 
@@ -30,19 +28,13 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Optional;
 
 /**
  * Ingest position manager for openGauss.
  */
-// TODO reuse PostgreSQLIngestPositionManager
-@Slf4j
 public final class OpenGaussIngestPositionManager implements 
DialectIngestPositionManager {
     
-    private static final String DECODE_PLUGIN = "mppdb_decoding";
-    
-    private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
+    private final PostgreSQLSlotManager slotManager = new 
PostgreSQLSlotManager("mppdb_decoding");
     
     @Override
     public WALPosition init(final String data) {
@@ -52,60 +44,15 @@ public final class OpenGaussIngestPositionManager 
implements DialectIngestPositi
     @Override
     public WALPosition init(final DataSource dataSource, final String 
slotNameSuffix) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
-            createSlotIfNotExist(connection, 
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix));
+            slotManager.create(connection, slotNameSuffix);
             return getWALPosition(connection);
         }
     }
     
-    private void createSlotIfNotExist(final Connection connection, final 
String slotName) throws SQLException {
-        Optional<ReplicationSlotInfo> slotInfo = getSlotInfo(connection, 
slotName);
-        if (!slotInfo.isPresent()) {
-            createSlot(connection, slotName);
-            return;
-        }
-        if (null == slotInfo.get().getDatabaseName()) {
-            dropSlotIfExist(connection, slotName);
-            createSlot(connection, slotName);
-        }
-    }
-    
-    private Optional<ReplicationSlotInfo> getSlotInfo(final Connection 
connection, final String slotName) throws SQLException {
-        String sql = "SELECT slot_name, database FROM pg_replication_slots 
WHERE slot_name=? AND plugin=?";
-        try (PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
-            preparedStatement.setString(1, slotName);
-            preparedStatement.setString(2, DECODE_PLUGIN);
-            try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                return resultSet.next() ? Optional.of(new 
ReplicationSlotInfo(resultSet.getString(1), resultSet.getString(2))) : 
Optional.empty();
-            }
-        }
-    }
-    
-    private void createSlot(final Connection connection, final String 
slotName) throws SQLException {
-        try (PreparedStatement preparedStatement = 
connection.prepareStatement("SELECT * FROM 
pg_create_logical_replication_slot(?, ?)")) {
-            preparedStatement.setString(1, slotName);
-            preparedStatement.setString(2, DECODE_PLUGIN);
-            preparedStatement.execute();
-        } catch (final SQLException ex) {
-            if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
-                throw ex;
-            }
-        }
-    }
-    
-    private void dropSlotIfExist(final Connection connection, final String 
slotName) throws SQLException {
-        if (!getSlotInfo(connection, slotName).isPresent()) {
-            return;
-        }
-        try (PreparedStatement preparedStatement = 
connection.prepareStatement("SELECT pg_drop_replication_slot(?)")) {
-            preparedStatement.setString(1, slotName);
-            preparedStatement.execute();
-        }
-    }
-    
     private WALPosition getWALPosition(final Connection connection) throws 
SQLException {
         try (
-                Statement statement = connection.createStatement();
-                ResultSet resultSet = statement.executeQuery("SELECT 
PG_CURRENT_XLOG_LOCATION()")) {
+                PreparedStatement preparedStatement = 
connection.prepareStatement("SELECT PG_CURRENT_XLOG_LOCATION()");
+                ResultSet resultSet = preparedStatement.executeQuery()) {
             resultSet.next();
             return new WALPosition(new 
OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1))));
         }
@@ -114,7 +61,7 @@ public final class OpenGaussIngestPositionManager implements 
DialectIngestPositi
     @Override
     public void destroy(final DataSource dataSource, final String 
slotNameSuffix) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
-            dropSlotIfExist(connection, 
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix));
+            slotManager.dropIfExisted(connection, slotNameSuffix);
         }
     }
     
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
index dd142ff8278..54918dc2c7b 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussWALDumper.java
@@ -32,7 +32,7 @@ import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.OpenGaussLog
 import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.MppdbDecodingPlugin;
 import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
 import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussTimestampUtils;
-import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.PostgreSQLSlotNameGenerator;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.slot.PostgreSQLSlotNameGenerator;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALEventConverter;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.DecodingPlugin;
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
index 82e4c507719..b5fa2e8dbed 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
@@ -17,30 +17,26 @@
 
 package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 
-import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
-import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.pojo.ReplicationSlotInfo;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.slot.PostgreSQLSlotManager;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
 import org.postgresql.replication.LogSequenceNumber;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
+import java.sql.DatabaseMetaData;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.Optional;
 
 /**
  * Ingest position manager for PostgreSQL.
  */
-@Slf4j
 public final class PostgreSQLIngestPositionManager implements 
DialectIngestPositionManager {
     
-    private static final String DECODE_PLUGIN = "test_decoding";
-    
-    private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
+    private final PostgreSQLSlotManager slotManager = new 
PostgreSQLSlotManager("test_decoding");
     
     @Override
     public WALPosition init(final String data) {
@@ -50,79 +46,34 @@ public final class PostgreSQLIngestPositionManager 
implements DialectIngestPosit
     @Override
     public WALPosition init(final DataSource dataSource, final String 
slotNameSuffix) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
-            createSlotIfNotExist(connection, 
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix));
-            return getWALPosition(connection);
-        }
-    }
-    
-    private void createSlotIfNotExist(final Connection connection, final 
String slotName) throws SQLException {
-        Optional<ReplicationSlotInfo> slotInfo = getSlotInfo(connection, 
slotName);
-        if (!slotInfo.isPresent()) {
-            createSlot(connection, slotName);
-            return;
-        }
-        if (null == slotInfo.get().getDatabaseName()) {
-            dropSlotIfExist(connection, slotName);
-            createSlot(connection, slotName);
-        }
-    }
-    
-    private Optional<ReplicationSlotInfo> getSlotInfo(final Connection 
connection, final String slotName) throws SQLException {
-        String sql = "SELECT slot_name, database FROM pg_replication_slots 
WHERE slot_name=? AND plugin=?";
-        try (PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
-            preparedStatement.setString(1, slotName);
-            preparedStatement.setString(2, DECODE_PLUGIN);
-            try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                return resultSet.next() ? Optional.of(new 
ReplicationSlotInfo(resultSet.getString(1), resultSet.getString(2))) : 
Optional.empty();
-            }
-        }
-    }
-    
-    private void createSlot(final Connection connection, final String 
slotName) throws SQLException {
-        try (PreparedStatement preparedStatement = 
connection.prepareStatement("SELECT * FROM 
pg_create_logical_replication_slot(?, ?)")) {
-            preparedStatement.setString(1, slotName);
-            preparedStatement.setString(2, DECODE_PLUGIN);
-            preparedStatement.execute();
-        } catch (final SQLException ex) {
-            if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
-                throw ex;
-            }
-        }
-    }
-    
-    private void dropSlotIfExist(final Connection connection, final String 
slotName) throws SQLException {
-        if (!getSlotInfo(connection, slotName).isPresent()) {
-            return;
-        }
-        try (PreparedStatement preparedStatement = 
connection.prepareStatement("SELECT pg_drop_replication_slot(?)")) {
-            preparedStatement.setString(1, slotName);
-            preparedStatement.execute();
+            slotManager.create(connection, slotNameSuffix);
+            return getWALPosition(connection, 
getLogSequenceNumberSQL(connection.getMetaData()));
         }
     }
     
-    private WALPosition getWALPosition(final Connection connection) throws 
SQLException {
+    private WALPosition getWALPosition(final Connection connection, final 
String logSequenceNumberSQL) throws SQLException {
         try (
-                PreparedStatement preparedStatement = 
connection.prepareStatement(getLogSequenceNumberSQL(connection));
+                PreparedStatement preparedStatement = 
connection.prepareStatement(logSequenceNumberSQL);
                 ResultSet resultSet = preparedStatement.executeQuery()) {
             resultSet.next();
             return new WALPosition(new 
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1))));
         }
     }
     
-    private String getLogSequenceNumberSQL(final Connection connection) throws 
SQLException {
-        if (9 == connection.getMetaData().getDatabaseMajorVersion() && 6 <= 
connection.getMetaData().getDatabaseMinorVersion()) {
+    private String getLogSequenceNumberSQL(final DatabaseMetaData metaData) 
throws SQLException {
+        if (9 == metaData.getDatabaseMajorVersion() && 6 <= 
metaData.getDatabaseMinorVersion()) {
             return "SELECT PG_CURRENT_XLOG_LOCATION()";
         }
-        if (10 <= connection.getMetaData().getDatabaseMajorVersion()) {
+        if (10 <= metaData.getDatabaseMajorVersion()) {
             return "SELECT PG_CURRENT_WAL_LSN()";
         }
-        throw new PipelineInternalException("Unsupported PostgreSQL version: " 
+ connection.getMetaData().getDatabaseProductVersion());
+        throw new PipelineInternalException("Unsupported PostgreSQL version: " 
+ metaData.getDatabaseProductVersion());
     }
     
     @Override
     public void destroy(final DataSource dataSource, final String 
slotNameSuffix) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
-            dropSlotIfExist(connection, 
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix));
+            slotManager.dropIfExisted(connection, slotNameSuffix);
         }
     }
     
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
index 5674040f903..7efb7e9d27a 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumper.java
@@ -28,6 +28,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPositi
 import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.slot.PostgreSQLSlotNameGenerator;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALEventConverter;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/slot/PostgreSQLSlotManager.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/slot/PostgreSQLSlotManager.java
new file mode 100644
index 00000000000..e0d08e82ee9
--- /dev/null
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/slot/PostgreSQLSlotManager.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.postgresql.ingest.slot;
+
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.pojo.ReplicationSlotInfo;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Optional;
+
+/**
+ * PostgreSQL slot manager.
+ */
+@RequiredArgsConstructor
+public final class PostgreSQLSlotManager {
+    
+    private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
+    
+    private final String decodePlugin;
+    
+    /**
+     * Create slot.
+     *
+     * @param connection connection
+     * @param slotNameSuffix slot name suffix
+     * @throws SQLException SQL exception
+     */
+    public void create(final Connection connection, final String 
slotNameSuffix) throws SQLException {
+        String slotName = 
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix);
+        Optional<ReplicationSlotInfo> slotInfo = load(connection, slotName);
+        if (!slotInfo.isPresent()) {
+            doCreate(connection, slotName);
+            return;
+        }
+        if (null == slotInfo.get().getDatabaseName()) {
+            doDrop(connection, slotName);
+            doCreate(connection, slotName);
+        }
+    }
+    
+    private Optional<ReplicationSlotInfo> load(final Connection connection, 
final String slotName) throws SQLException {
+        String sql = "SELECT slot_name, database FROM pg_replication_slots 
WHERE slot_name=? AND plugin=?";
+        try (PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            preparedStatement.setString(1, slotName);
+            preparedStatement.setString(2, decodePlugin);
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                return resultSet.next() ? Optional.of(new 
ReplicationSlotInfo(resultSet.getString(1), resultSet.getString(2))) : 
Optional.empty();
+            }
+        }
+    }
+    
+    private void doCreate(final Connection connection, final String slotName) 
throws SQLException {
+        try (PreparedStatement preparedStatement = 
connection.prepareStatement("SELECT * FROM 
pg_create_logical_replication_slot(?, ?)")) {
+            preparedStatement.setString(1, slotName);
+            preparedStatement.setString(2, decodePlugin);
+            preparedStatement.execute();
+        } catch (final SQLException ex) {
+            if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
+                throw ex;
+            }
+        }
+    }
+    
+    /**
+     * Drop slot if existed.
+     *
+     * @param connection connection
+     * @param slotNameSuffix slot name suffix
+     * @throws SQLException SQL exception
+     */
+    public void dropIfExisted(final Connection connection, final String 
slotNameSuffix) throws SQLException {
+        String slotName = 
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix);
+        if (!load(connection, slotName).isPresent()) {
+            return;
+        }
+        doDrop(connection, slotName);
+    }
+    
+    private void doDrop(final Connection connection, final String slotName) 
throws SQLException {
+        try (PreparedStatement preparedStatement = 
connection.prepareStatement("SELECT pg_drop_replication_slot(?)")) {
+            preparedStatement.setString(1, slotName);
+            preparedStatement.execute();
+        }
+    }
+}
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLSlotNameGenerator.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/slot/PostgreSQLSlotNameGenerator.java
similarity index 97%
rename from 
kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLSlotNameGenerator.java
rename to 
kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/slot/PostgreSQLSlotNameGenerator.java
index ab84f236d69..8d9f64bd64e 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLSlotNameGenerator.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/slot/PostgreSQLSlotNameGenerator.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
+package org.apache.shardingsphere.data.pipeline.postgresql.ingest.slot;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
@@ -37,7 +37,7 @@ public final class PostgreSQLSlotNameGenerator {
      *
      * @param connection connection
      * @param slotNameSuffix slot name suffix
-     * @return unique name by connection
+     * @return unique slot name
      * @throws SQLException failed when get catalog
      */
     public static String getUniqueSlotName(final Connection connection, final 
String slotNameSuffix) throws SQLException {
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index 9187cb58d9f..1978c634098 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -26,6 +26,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.In
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.ActualAndLogicTableNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.slot.PostgreSQLSlotNameGenerator;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.PostgreSQLLogicalReplication;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLSlotNameGeneratorTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/slot/PostgreSQLSlotNameGeneratorTest.java
similarity index 99%
rename from 
kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLSlotNameGeneratorTest.java
rename to 
kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/slot/PostgreSQLSlotNameGeneratorTest.java
index 7db70a30e6f..5b3acbf9184 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLSlotNameGeneratorTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/slot/PostgreSQLSlotNameGeneratorTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
+package org.apache.shardingsphere.data.pipeline.postgresql.ingest.slot;
 
 import org.junit.jupiter.api.Test;
 

Reply via email to