This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 19322c196f9 Refactor PostgreSQLIngestPositionManager (#32499)
19322c196f9 is described below

commit 19322c196f91b1733f4186de67e04d96ba1c455b
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Aug 14 11:41:38 2024 +0800

    Refactor PostgreSQLIngestPositionManager (#32499)
    
    * Refactor OpenGaussIngestPositionManager
    
    * Refactor PostgreSQLIngestPositionManager
    
    * Refactor PostgreSQLIngestPositionManager
    
    * Refactor PostgreSQLIngestPositionManager
---
 .../ingest/OpenGaussIngestPositionManager.java     |  8 ++--
 .../ingest/PostgreSQLIngestPositionManager.java    | 54 ++++++++++++----------
 .../PostgreSQLIngestPositionManagerTest.java       |  2 +-
 3 files changed, 34 insertions(+), 30 deletions(-)

diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
index 8dc1282d05a..1163e8ccc5b 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
@@ -52,13 +52,12 @@ public final class OpenGaussIngestPositionManager 
implements DialectIngestPositi
     @Override
     public WALPosition init(final DataSource dataSource, final String 
slotNameSuffix) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
-            createSlotIfNotExist(connection, slotNameSuffix);
+            createSlotIfNotExist(connection, 
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix));
             return getWALPosition(connection);
         }
     }
     
-    private void createSlotIfNotExist(final Connection connection, final 
String slotNameSuffix) throws SQLException {
-        String slotName = 
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix);
+    private void createSlotIfNotExist(final Connection connection, final 
String slotName) throws SQLException {
         Optional<ReplicationSlotInfo> slotInfo = getSlotInfo(connection, 
slotName);
         if (!slotInfo.isPresent()) {
             createSlot(connection, slotName);
@@ -95,10 +94,9 @@ public final class OpenGaussIngestPositionManager implements 
DialectIngestPositi
     
     private void dropSlotIfExist(final Connection connection, final String 
slotName) throws SQLException {
         if (!getSlotInfo(connection, slotName).isPresent()) {
-            log.info("dropSlotIfExist, slot not exist, ignore, slotName={}", 
slotName);
             return;
         }
-        try (PreparedStatement preparedStatement = 
connection.prepareStatement("SELECT * from pg_drop_replication_slot(?)")) {
+        try (PreparedStatement preparedStatement = 
connection.prepareStatement("SELECT pg_drop_replication_slot(?)")) {
             preparedStatement.setString(1, slotName);
             preparedStatement.execute();
         }
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
index e3d23caea2d..82e4c507719 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
@@ -20,6 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.pojo.ReplicationSlotInfo;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
 import org.postgresql.replication.LogSequenceNumber;
@@ -29,6 +30,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Optional;
 
 /**
  * Ingest position manager for PostgreSQL.
@@ -54,10 +56,29 @@ public final class PostgreSQLIngestPositionManager 
implements DialectIngestPosit
     }
     
     private void createSlotIfNotExist(final Connection connection, final 
String slotName) throws SQLException {
-        if (isSlotExisting(connection, slotName)) {
-            log.info("createSlotIfNotExist, slot exist, slotName={}", 
slotName);
+        Optional<ReplicationSlotInfo> slotInfo = getSlotInfo(connection, 
slotName);
+        if (!slotInfo.isPresent()) {
+            createSlot(connection, slotName);
             return;
         }
+        if (null == slotInfo.get().getDatabaseName()) {
+            dropSlotIfExist(connection, slotName);
+            createSlot(connection, slotName);
+        }
+    }
+    
+    private Optional<ReplicationSlotInfo> getSlotInfo(final Connection 
connection, final String slotName) throws SQLException {
+        String sql = "SELECT slot_name, database FROM pg_replication_slots 
WHERE slot_name=? AND plugin=?";
+        try (PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            preparedStatement.setString(1, slotName);
+            preparedStatement.setString(2, DECODE_PLUGIN);
+            try (ResultSet resultSet = preparedStatement.executeQuery()) {
+                return resultSet.next() ? Optional.of(new 
ReplicationSlotInfo(resultSet.getString(1), resultSet.getString(2))) : 
Optional.empty();
+            }
+        }
+    }
+    
+    private void createSlot(final Connection connection, final String 
slotName) throws SQLException {
         try (PreparedStatement preparedStatement = 
connection.prepareStatement("SELECT * FROM 
pg_create_logical_replication_slot(?, ?)")) {
             preparedStatement.setString(1, slotName);
             preparedStatement.setString(2, DECODE_PLUGIN);
@@ -69,14 +90,13 @@ public final class PostgreSQLIngestPositionManager 
implements DialectIngestPosit
         }
     }
     
-    private boolean isSlotExisting(final Connection connection, final String 
slotName) throws SQLException {
-        String checkSlotSQL = "SELECT slot_name FROM pg_replication_slots 
WHERE slot_name=? AND plugin=?";
-        try (PreparedStatement preparedStatement = 
connection.prepareStatement(checkSlotSQL)) {
+    private void dropSlotIfExist(final Connection connection, final String 
slotName) throws SQLException {
+        if (!getSlotInfo(connection, slotName).isPresent()) {
+            return;
+        }
+        try (PreparedStatement preparedStatement = 
connection.prepareStatement("SELECT pg_drop_replication_slot(?)")) {
             preparedStatement.setString(1, slotName);
-            preparedStatement.setString(2, DECODE_PLUGIN);
-            try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                return resultSet.next();
-            }
+            preparedStatement.execute();
         }
     }
     
@@ -102,21 +122,7 @@ public final class PostgreSQLIngestPositionManager 
implements DialectIngestPosit
     @Override
     public void destroy(final DataSource dataSource, final String 
slotNameSuffix) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
-            dropSlotIfExist(connection, slotNameSuffix);
-        }
-    }
-    
-    private void dropSlotIfExist(final Connection connection, final String 
slotNameSuffix) throws SQLException {
-        String slotName = 
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix);
-        if (!isSlotExisting(connection, slotName)) {
-            log.info("dropSlotIfExist, slot not exist, slotName={}", slotName);
-            return;
-        }
-        log.info("dropSlotIfExist, slot exist, slotName={}", slotName);
-        String dropSlotSQL = "SELECT pg_drop_replication_slot(?)";
-        try (PreparedStatement preparedStatement = 
connection.prepareStatement(dropSlotSQL)) {
-            preparedStatement.setString(1, slotName);
-            preparedStatement.execute();
+            dropSlotIfExist(connection, 
PostgreSQLSlotNameGenerator.getUniqueSlotName(connection, slotNameSuffix));
         }
     }
     
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManagerTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManagerTest.java
index 045aa5b3025..2c539c2a99a 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManagerTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManagerTest.java
@@ -119,7 +119,7 @@ class PostgreSQLIngestPositionManagerTest {
     @SneakyThrows(SQLException.class)
     private void mockSlotExistsOrNot(final boolean exists) {
         PreparedStatement preparedStatement = mock(PreparedStatement.class);
-        when(connection.prepareStatement("SELECT slot_name FROM 
pg_replication_slots WHERE slot_name=? AND 
plugin=?")).thenReturn(preparedStatement);
+        when(connection.prepareStatement("SELECT slot_name, database FROM 
pg_replication_slots WHERE slot_name=? AND 
plugin=?")).thenReturn(preparedStatement);
         ResultSet resultSet = mock(ResultSet.class);
         when(preparedStatement.executeQuery()).thenReturn(resultSet);
         when(resultSet.next()).thenReturn(exists);

Reply via email to