This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 8faf1485f59 Refactor DialectIngestPositionManager (#32495)
8faf1485f59 is described below

commit 8faf1485f5983a5045791c9700aa470747e0ed0f
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Aug 13 22:00:01 2024 +0800

    Refactor DialectIngestPositionManager (#32495)
    
    * Refactor MySQLIngestPositionManager
    
    * Refactor DialectIngestPositionManager
    
    * Refactor DialectIngestPositionManager
    
    * Refactor DialectIngestPositionManager
---
 .../position/DialectIngestPositionManager.java     | 16 ++--
 .../mysql/ingest/MySQLIngestPositionManager.java   | 20 ++---
 .../ingest/OpenGaussIngestPositionManager.java     | 95 ++++++++++------------
 .../ingest/PostgreSQLIngestPositionManager.java    | 49 +++++------
 .../ingest/wal/decode/BaseLogSequenceNumber.java   | 10 +--
 .../wal/decode/PostgreSQLLogSequenceNumber.java    |  2 +-
 .../PostgreSQLIngestPositionManagerTest.java       |  3 +-
 7 files changed, 91 insertions(+), 104 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/DialectIngestPositionManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/DialectIngestPositionManager.java
index 02210c7ebdd..657bb8f4ab9 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/DialectIngestPositionManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/DialectIngestPositionManager.java
@@ -30,22 +30,22 @@ import java.sql.SQLException;
 public interface DialectIngestPositionManager extends DatabaseTypedSPI {
     
     /**
-     * Init position by data source.
+     * Init position by string data.
      *
-     * @param dataSource data source
-     * @param slotNameSuffix slot name suffix
+     * @param data string data
      * @return position
-     * @throws SQLException SQL exception
      */
-    IngestPosition init(DataSource dataSource, String slotNameSuffix) throws 
SQLException;
+    IngestPosition init(String data);
     
     /**
-     * Init position by string data.
+     * Init position by data source.
      *
-     * @param data string data
+     * @param dataSource data source
+     * @param slotNameSuffix slot name suffix
      * @return position
+     * @throws SQLException SQL exception
      */
-    IngestPosition init(String data);
+    IngestPosition init(DataSource dataSource, String slotNameSuffix) throws 
SQLException;
     
     /**
      * Clean up by data source if necessary.
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManager.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManager.java
index cbc3170f584..aff2ef03b26 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManager.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManager.java
@@ -32,13 +32,6 @@ import java.sql.SQLException;
  */
 public final class MySQLIngestPositionManager implements 
DialectIngestPositionManager {
     
-    @Override
-    public BinlogPosition init(final DataSource dataSource, final String 
slotNameSuffix) throws SQLException {
-        try (Connection connection = dataSource.getConnection()) {
-            return getBinlogPosition(connection);
-        }
-    }
-    
     @Override
     public BinlogPosition init(final String data) {
         String[] array = data.split("#");
@@ -46,17 +39,20 @@ public final class MySQLIngestPositionManager implements 
DialectIngestPositionMa
         return new BinlogPosition(array[0], Long.parseLong(array[1]));
     }
     
+    @Override
+    public BinlogPosition init(final DataSource dataSource, final String 
slotNameSuffix) throws SQLException {
+        try (Connection connection = dataSource.getConnection()) {
+            return getBinlogPosition(connection);
+        }
+    }
+    
     private BinlogPosition getBinlogPosition(final Connection connection) 
throws SQLException {
-        String filename;
-        long position;
         try (
                 PreparedStatement preparedStatement = 
connection.prepareStatement("SHOW MASTER STATUS");
                 ResultSet resultSet = preparedStatement.executeQuery()) {
             resultSet.next();
-            filename = resultSet.getString(1);
-            position = resultSet.getLong(2);
+            return new BinlogPosition(resultSet.getString(1), 
resultSet.getLong(2));
         }
-        return new BinlogPosition(filename, position);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
index 9b885d432d4..776932449b7 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
@@ -19,18 +19,18 @@ package 
org.apache.shardingsphere.data.pipeline.opengauss.ingest;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.digest.DigestUtils;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
 import 
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.pojo.ReplicationSlotInfo;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
 import org.opengauss.replication.LogSequenceNumber;
 
 import javax.sql.DataSource;
-import java.sql.CallableStatement;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.Optional;
 
 /**
@@ -46,12 +46,18 @@ public final class OpenGaussIngestPositionManager 
implements DialectIngestPositi
     
     private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
     
-    @Override
-    public WALPosition init(final DataSource dataSource, final String 
slotNameSuffix) throws SQLException {
-        try (Connection connection = dataSource.getConnection()) {
-            createSlotIfNotExist(connection, slotNameSuffix);
-            return getWalPosition(connection);
-        }
+    /**
+     * Get the unique slot name by connection.
+     *
+     * @param connection connection
+     * @param slotNameSuffix slot name suffix
+     * @return the unique name by connection
+     * @throws SQLException failed when getCatalog
+     */
+    public static String getUniqueSlotName(final Connection connection, final 
String slotNameSuffix) throws SQLException {
+        // same as PostgreSQL, but length over 64 will throw an exception 
directly
+        String slotName = DigestUtils.md5Hex(String.join("_", 
connection.getCatalog(), slotNameSuffix).getBytes());
+        return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
     }
     
     @Override
@@ -59,23 +65,24 @@ public final class OpenGaussIngestPositionManager 
implements DialectIngestPositi
         return new WALPosition(new 
OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(data)));
     }
     
-    /**
-     * Create logical replication slot if it does not exist.
-     *
-     * @param connection connection
-     * @param slotNameSuffix slotName suffix
-     * @throws SQLException SQL exception
-     */
+    @Override
+    public WALPosition init(final DataSource dataSource, final String 
slotNameSuffix) throws SQLException {
+        try (Connection connection = dataSource.getConnection()) {
+            createSlotIfNotExist(connection, slotNameSuffix);
+            return getWALPosition(connection);
+        }
+    }
+    
     private void createSlotIfNotExist(final Connection connection, final 
String slotNameSuffix) throws SQLException {
         String slotName = getUniqueSlotName(connection, slotNameSuffix);
         Optional<ReplicationSlotInfo> slotInfo = getSlotInfo(connection, 
slotName);
         if (!slotInfo.isPresent()) {
-            createSlotBySQL(connection, slotName);
+            createSlot(connection, slotName);
             return;
         }
         if (null == slotInfo.get().getDatabaseName()) {
             dropSlotIfExist(connection, slotName);
-            createSlotBySQL(connection, slotName);
+            createSlot(connection, slotName);
         }
     }
     
@@ -85,17 +92,15 @@ public final class OpenGaussIngestPositionManager 
implements DialectIngestPositi
             preparedStatement.setString(1, slotName);
             preparedStatement.setString(2, DECODE_PLUGIN);
             try (ResultSet resultSet = preparedStatement.executeQuery()) {
-                if (!resultSet.next()) {
-                    return Optional.empty();
-                }
-                return Optional.of(new 
ReplicationSlotInfo(resultSet.getString(1), resultSet.getString(2)));
+                return resultSet.next() ? Optional.of(new 
ReplicationSlotInfo(resultSet.getString(1), resultSet.getString(2))) : 
Optional.empty();
             }
         }
     }
     
-    private void createSlotBySQL(final Connection connection, final String 
slotName) throws SQLException {
-        String sql = String.format("SELECT * FROM 
pg_create_logical_replication_slot('%s', '%s')", slotName, DECODE_PLUGIN);
-        try (PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+    private void createSlot(final Connection connection, final String 
slotName) throws SQLException {
+        try (PreparedStatement preparedStatement = 
connection.prepareStatement("SELECT * FROM 
pg_create_logical_replication_slot(?, ?)")) {
+            preparedStatement.setString(1, slotName);
+            preparedStatement.setString(2, DECODE_PLUGIN);
             preparedStatement.execute();
         } catch (final SQLException ex) {
             if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
@@ -104,10 +109,21 @@ public final class OpenGaussIngestPositionManager 
implements DialectIngestPositi
         }
     }
     
-    private WALPosition getWalPosition(final Connection connection) throws 
SQLException {
+    private void dropSlotIfExist(final Connection connection, final String 
slotName) throws SQLException {
+        if (!getSlotInfo(connection, slotName).isPresent()) {
+            log.info("dropSlotIfExist, slot not exist, ignore, slotName={}", 
slotName);
+            return;
+        }
+        try (PreparedStatement preparedStatement = 
connection.prepareStatement("SELECT * from pg_drop_replication_slot(?)")) {
+            preparedStatement.setString(1, slotName);
+            preparedStatement.execute();
+        }
+    }
+    
+    private WALPosition getWALPosition(final Connection connection) throws 
SQLException {
         try (
-                PreparedStatement preparedStatement = 
connection.prepareStatement("SELECT PG_CURRENT_XLOG_LOCATION()");
-                ResultSet resultSet = preparedStatement.executeQuery()) {
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery("SELECT 
PG_CURRENT_XLOG_LOCATION()")) {
             resultSet.next();
             return new WALPosition(new 
OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1))));
         }
@@ -120,31 +136,6 @@ public final class OpenGaussIngestPositionManager 
implements DialectIngestPositi
         }
     }
     
-    private void dropSlotIfExist(final Connection connection, final String 
slotName) throws SQLException {
-        if (!getSlotInfo(connection, slotName).isPresent()) {
-            log.info("dropSlotIfExist, slot not exist, ignore, slotName={}", 
slotName);
-            return;
-        }
-        String sql = String.format("select * from 
pg_drop_replication_slot('%s')", slotName);
-        try (CallableStatement callableStatement = 
connection.prepareCall(sql)) {
-            callableStatement.execute();
-        }
-    }
-    
-    /**
-     * Get the unique slot name by connection.
-     *
-     * @param connection connection
-     * @param slotNameSuffix slot name suffix
-     * @return the unique name by connection
-     * @throws SQLException failed when getCatalog
-     */
-    public static String getUniqueSlotName(final Connection connection, final 
String slotNameSuffix) throws SQLException {
-        // same as PostgreSQL, but length over 64 will throw an exception 
directly
-        String slotName = DigestUtils.md5Hex(String.join("_", 
connection.getCatalog(), slotNameSuffix).getBytes());
-        return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
-    }
-    
     @Override
     public String getDatabaseType() {
         return "openGauss";
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
index d3db37369bd..389ba0ace30 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
@@ -20,9 +20,9 @@ package 
org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.digest.DigestUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
 import org.postgresql.replication.LogSequenceNumber;
 
 import javax.sql.DataSource;
@@ -43,12 +43,18 @@ public final class PostgreSQLIngestPositionManager 
implements DialectIngestPosit
     
     private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
     
-    @Override
-    public WALPosition init(final DataSource dataSource, final String 
slotNameSuffix) throws SQLException {
-        try (Connection connection = dataSource.getConnection()) {
-            createSlotIfNotExist(connection, getUniqueSlotName(connection, 
slotNameSuffix));
-            return getWalPosition(connection);
-        }
+    /**
+     * Get the unique slot name by connection.
+     *
+     * @param connection the connection
+     * @param slotNameSuffix slot name suffix
+     * @return the unique name by connection
+     * @throws SQLException failed when getCatalog
+     */
+    public static String getUniqueSlotName(final Connection connection, final 
String slotNameSuffix) throws SQLException {
+        // PostgreSQL slot name maximum length can't exceed 64,automatic 
truncation when the length exceeds the limit
+        String slotName = DigestUtils.md5Hex(String.join("_", 
connection.getCatalog(), slotNameSuffix).getBytes());
+        return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
     }
     
     @Override
@@ -56,13 +62,22 @@ public final class PostgreSQLIngestPositionManager 
implements DialectIngestPosit
         return new WALPosition(new 
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(data)));
     }
     
+    @Override
+    public WALPosition init(final DataSource dataSource, final String 
slotNameSuffix) throws SQLException {
+        try (Connection connection = dataSource.getConnection()) {
+            createSlotIfNotExist(connection, getUniqueSlotName(connection, 
slotNameSuffix));
+            return getWALPosition(connection);
+        }
+    }
+    
     private void createSlotIfNotExist(final Connection connection, final 
String slotName) throws SQLException {
         if (isSlotExisting(connection, slotName)) {
             log.info("createSlotIfNotExist, slot exist, slotName={}", 
slotName);
             return;
         }
-        String createSlotSQL = String.format("SELECT * FROM 
pg_create_logical_replication_slot('%s', '%s')", slotName, DECODE_PLUGIN);
-        try (PreparedStatement preparedStatement = 
connection.prepareStatement(createSlotSQL)) {
+        try (PreparedStatement preparedStatement = 
connection.prepareStatement("SELECT * FROM 
pg_create_logical_replication_slot(?, ?)")) {
+            preparedStatement.setString(1, slotName);
+            preparedStatement.setString(2, DECODE_PLUGIN);
             preparedStatement.execute();
         } catch (final SQLException ex) {
             if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
@@ -82,7 +97,7 @@ public final class PostgreSQLIngestPositionManager implements 
DialectIngestPosit
         }
     }
     
-    private WALPosition getWalPosition(final Connection connection) throws 
SQLException {
+    private WALPosition getWALPosition(final Connection connection) throws 
SQLException {
         try (
                 PreparedStatement preparedStatement = 
connection.prepareStatement(getLogSequenceNumberSQL(connection));
                 ResultSet resultSet = preparedStatement.executeQuery()) {
@@ -122,20 +137,6 @@ public final class PostgreSQLIngestPositionManager 
implements DialectIngestPosit
         }
     }
     
-    /**
-     * Get the unique slot name by connection.
-     *
-     * @param connection the connection
-     * @param slotNameSuffix slot name suffix
-     * @return the unique name by connection
-     * @throws SQLException failed when getCatalog
-     */
-    public static String getUniqueSlotName(final Connection connection, final 
String slotNameSuffix) throws SQLException {
-        // PostgreSQL slot name maximum length can't exceed 64,automatic 
truncation when the length exceeds the limit
-        String slotName = DigestUtils.md5Hex(String.join("_", 
connection.getCatalog(), slotNameSuffix).getBytes());
-        return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
-    }
-    
     @Override
     public String getDatabaseType() {
         return "PostgreSQL";
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/BaseLogSequenceNumber.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/BaseLogSequenceNumber.java
index 5a92412bc96..81409bd1965 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/BaseLogSequenceNumber.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/BaseLogSequenceNumber.java
@@ -18,21 +18,21 @@
 package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode;
 
 /**
- * Base of log sequence number interface.
+ * Log sequence number.
  */
 public interface BaseLogSequenceNumber {
     
     /**
-     * Convert log sequence number to String.
+     * Convert log sequence number to string.
      *
-     * @return Long the sequence number of String value
+     * @return converted string value
      */
     String asString();
     
     /**
-     * Get the binded object.
+     * Get bound log sequence number.
      *
-     * @return Object the bind log sequence number
+     * @return bound log sequence number
      */
     Object get();
 }
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java
index 2b15f5d09da..5392ec97454 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java
@@ -22,7 +22,7 @@ import lombok.ToString;
 import org.postgresql.replication.LogSequenceNumber;
 
 /**
- * PostgreSQL sequence.
+ * Log sequence number of PostgreSQL.
  */
 @RequiredArgsConstructor
 @ToString
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManagerTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManagerTest.java
index b7e7122b7d8..045aa5b3025 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManagerTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManagerTest.java
@@ -65,8 +65,7 @@ class PostgreSQLIngestPositionManagerTest {
         when(connection.getCatalog()).thenReturn("sharding_db");
         when(connection.getMetaData()).thenReturn(databaseMetaData);
         PreparedStatement lsn96PreparedStatement = mockPostgreSQL96LSN();
-        when(connection.prepareStatement(String.format("SELECT * FROM 
pg_create_logical_replication_slot('%s', '%s')", 
PostgreSQLIngestPositionManager.getUniqueSlotName(connection, ""),
-                "test_decoding"))).thenReturn(mock(PreparedStatement.class));
+        when(connection.prepareStatement("SELECT * FROM 
pg_create_logical_replication_slot(?, 
?)")).thenReturn(mock(PreparedStatement.class));
         when(connection.prepareStatement("SELECT 
PG_CURRENT_XLOG_LOCATION()")).thenReturn(lsn96PreparedStatement);
         PreparedStatement lsn10PreparedStatement = mockPostgreSQL10LSN();
         when(connection.prepareStatement("SELECT 
PG_CURRENT_WAL_LSN()")).thenReturn(lsn10PreparedStatement);

Reply via email to