This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8faf1485f59 Refactor DialectIngestPositionManager (#32495)
8faf1485f59 is described below
commit 8faf1485f5983a5045791c9700aa470747e0ed0f
Author: Liang Zhang <[email protected]>
AuthorDate: Tue Aug 13 22:00:01 2024 +0800
Refactor DialectIngestPositionManager (#32495)
* Refactor MySQLIngestPositionManager
* Refactor DialectIngestPositionManager
* Refactor DialectIngestPositionManager
* Refactor DialectIngestPositionManager
---
.../position/DialectIngestPositionManager.java | 16 ++--
.../mysql/ingest/MySQLIngestPositionManager.java | 20 ++---
.../ingest/OpenGaussIngestPositionManager.java | 95 ++++++++++------------
.../ingest/PostgreSQLIngestPositionManager.java | 49 +++++------
.../ingest/wal/decode/BaseLogSequenceNumber.java | 10 +--
.../wal/decode/PostgreSQLLogSequenceNumber.java | 2 +-
.../PostgreSQLIngestPositionManagerTest.java | 3 +-
7 files changed, 91 insertions(+), 104 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/DialectIngestPositionManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/DialectIngestPositionManager.java
index 02210c7ebdd..657bb8f4ab9 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/DialectIngestPositionManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/position/DialectIngestPositionManager.java
@@ -30,22 +30,22 @@ import java.sql.SQLException;
public interface DialectIngestPositionManager extends DatabaseTypedSPI {
/**
- * Init position by data source.
+ * Init position by string data.
*
- * @param dataSource data source
- * @param slotNameSuffix slot name suffix
+ * @param data string data
* @return position
- * @throws SQLException SQL exception
*/
- IngestPosition init(DataSource dataSource, String slotNameSuffix) throws
SQLException;
+ IngestPosition init(String data);
/**
- * Init position by string data.
+ * Init position by data source.
*
- * @param data string data
+ * @param dataSource data source
+ * @param slotNameSuffix slot name suffix
* @return position
+ * @throws SQLException SQL exception
*/
- IngestPosition init(String data);
+ IngestPosition init(DataSource dataSource, String slotNameSuffix) throws
SQLException;
/**
* Clean up by data source if necessary.
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManager.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManager.java
index cbc3170f584..aff2ef03b26 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManager.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIngestPositionManager.java
@@ -32,13 +32,6 @@ import java.sql.SQLException;
*/
public final class MySQLIngestPositionManager implements
DialectIngestPositionManager {
- @Override
- public BinlogPosition init(final DataSource dataSource, final String
slotNameSuffix) throws SQLException {
- try (Connection connection = dataSource.getConnection()) {
- return getBinlogPosition(connection);
- }
- }
-
@Override
public BinlogPosition init(final String data) {
String[] array = data.split("#");
@@ -46,17 +39,20 @@ public final class MySQLIngestPositionManager implements
DialectIngestPositionMa
return new BinlogPosition(array[0], Long.parseLong(array[1]));
}
+ @Override
+ public BinlogPosition init(final DataSource dataSource, final String
slotNameSuffix) throws SQLException {
+ try (Connection connection = dataSource.getConnection()) {
+ return getBinlogPosition(connection);
+ }
+ }
+
private BinlogPosition getBinlogPosition(final Connection connection)
throws SQLException {
- String filename;
- long position;
try (
PreparedStatement preparedStatement =
connection.prepareStatement("SHOW MASTER STATUS");
ResultSet resultSet = preparedStatement.executeQuery()) {
resultSet.next();
- filename = resultSet.getString(1);
- position = resultSet.getLong(2);
+ return new BinlogPosition(resultSet.getString(1),
resultSet.getLong(2));
}
- return new BinlogPosition(filename, position);
}
@Override
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
index 9b885d432d4..776932449b7 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/OpenGaussIngestPositionManager.java
@@ -19,18 +19,18 @@ package
org.apache.shardingsphere.data.pipeline.opengauss.ingest;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
import
org.apache.shardingsphere.data.pipeline.opengauss.ingest.wal.decode.OpenGaussLogSequenceNumber;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.pojo.ReplicationSlotInfo;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
import org.opengauss.replication.LogSequenceNumber;
import javax.sql.DataSource;
-import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import java.util.Optional;
/**
@@ -46,12 +46,18 @@ public final class OpenGaussIngestPositionManager
implements DialectIngestPositi
private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
- @Override
- public WALPosition init(final DataSource dataSource, final String
slotNameSuffix) throws SQLException {
- try (Connection connection = dataSource.getConnection()) {
- createSlotIfNotExist(connection, slotNameSuffix);
- return getWalPosition(connection);
- }
+ /**
+ * Get the unique slot name by connection.
+ *
+ * @param connection connection
+ * @param slotNameSuffix slot name suffix
+ * @return the unique name by connection
+ * @throws SQLException failed when getCatalog
+ */
+ public static String getUniqueSlotName(final Connection connection, final
String slotNameSuffix) throws SQLException {
+ // same as PostgreSQL, but length over 64 will throw an exception
directly
+ String slotName = DigestUtils.md5Hex(String.join("_",
connection.getCatalog(), slotNameSuffix).getBytes());
+ return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
}
@Override
@@ -59,23 +65,24 @@ public final class OpenGaussIngestPositionManager
implements DialectIngestPositi
return new WALPosition(new
OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(data)));
}
- /**
- * Create logical replication slot if it does not exist.
- *
- * @param connection connection
- * @param slotNameSuffix slotName suffix
- * @throws SQLException SQL exception
- */
+ @Override
+ public WALPosition init(final DataSource dataSource, final String
slotNameSuffix) throws SQLException {
+ try (Connection connection = dataSource.getConnection()) {
+ createSlotIfNotExist(connection, slotNameSuffix);
+ return getWALPosition(connection);
+ }
+ }
+
private void createSlotIfNotExist(final Connection connection, final
String slotNameSuffix) throws SQLException {
String slotName = getUniqueSlotName(connection, slotNameSuffix);
Optional<ReplicationSlotInfo> slotInfo = getSlotInfo(connection,
slotName);
if (!slotInfo.isPresent()) {
- createSlotBySQL(connection, slotName);
+ createSlot(connection, slotName);
return;
}
if (null == slotInfo.get().getDatabaseName()) {
dropSlotIfExist(connection, slotName);
- createSlotBySQL(connection, slotName);
+ createSlot(connection, slotName);
}
}
@@ -85,17 +92,15 @@ public final class OpenGaussIngestPositionManager
implements DialectIngestPositi
preparedStatement.setString(1, slotName);
preparedStatement.setString(2, DECODE_PLUGIN);
try (ResultSet resultSet = preparedStatement.executeQuery()) {
- if (!resultSet.next()) {
- return Optional.empty();
- }
- return Optional.of(new
ReplicationSlotInfo(resultSet.getString(1), resultSet.getString(2)));
+ return resultSet.next() ? Optional.of(new
ReplicationSlotInfo(resultSet.getString(1), resultSet.getString(2))) :
Optional.empty();
}
}
}
- private void createSlotBySQL(final Connection connection, final String
slotName) throws SQLException {
- String sql = String.format("SELECT * FROM
pg_create_logical_replication_slot('%s', '%s')", slotName, DECODE_PLUGIN);
- try (PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
+ private void createSlot(final Connection connection, final String
slotName) throws SQLException {
+ try (PreparedStatement preparedStatement =
connection.prepareStatement("SELECT * FROM
pg_create_logical_replication_slot(?, ?)")) {
+ preparedStatement.setString(1, slotName);
+ preparedStatement.setString(2, DECODE_PLUGIN);
preparedStatement.execute();
} catch (final SQLException ex) {
if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
@@ -104,10 +109,21 @@ public final class OpenGaussIngestPositionManager
implements DialectIngestPositi
}
}
- private WALPosition getWalPosition(final Connection connection) throws
SQLException {
+ private void dropSlotIfExist(final Connection connection, final String
slotName) throws SQLException {
+ if (!getSlotInfo(connection, slotName).isPresent()) {
+ log.info("dropSlotIfExist, slot not exist, ignore, slotName={}",
slotName);
+ return;
+ }
+ try (PreparedStatement preparedStatement =
connection.prepareStatement("SELECT * from pg_drop_replication_slot(?)")) {
+ preparedStatement.setString(1, slotName);
+ preparedStatement.execute();
+ }
+ }
+
+ private WALPosition getWALPosition(final Connection connection) throws
SQLException {
try (
- PreparedStatement preparedStatement =
connection.prepareStatement("SELECT PG_CURRENT_XLOG_LOCATION()");
- ResultSet resultSet = preparedStatement.executeQuery()) {
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery("SELECT
PG_CURRENT_XLOG_LOCATION()")) {
resultSet.next();
return new WALPosition(new
OpenGaussLogSequenceNumber(LogSequenceNumber.valueOf(resultSet.getString(1))));
}
@@ -120,31 +136,6 @@ public final class OpenGaussIngestPositionManager
implements DialectIngestPositi
}
}
- private void dropSlotIfExist(final Connection connection, final String
slotName) throws SQLException {
- if (!getSlotInfo(connection, slotName).isPresent()) {
- log.info("dropSlotIfExist, slot not exist, ignore, slotName={}",
slotName);
- return;
- }
- String sql = String.format("select * from
pg_drop_replication_slot('%s')", slotName);
- try (CallableStatement callableStatement =
connection.prepareCall(sql)) {
- callableStatement.execute();
- }
- }
-
- /**
- * Get the unique slot name by connection.
- *
- * @param connection connection
- * @param slotNameSuffix slot name suffix
- * @return the unique name by connection
- * @throws SQLException failed when getCatalog
- */
- public static String getUniqueSlotName(final Connection connection, final
String slotNameSuffix) throws SQLException {
- // same as PostgreSQL, but length over 64 will throw an exception
directly
- String slotName = DigestUtils.md5Hex(String.join("_",
connection.getCatalog(), slotNameSuffix).getBytes());
- return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
- }
-
@Override
public String getDatabaseType() {
return "openGauss";
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
index d3db37369bd..389ba0ace30 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManager.java
@@ -20,9 +20,9 @@ package
org.apache.shardingsphere.data.pipeline.postgresql.ingest;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.WALPosition;
import
org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode.PostgreSQLLogSequenceNumber;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.position.DialectIngestPositionManager;
import org.postgresql.replication.LogSequenceNumber;
import javax.sql.DataSource;
@@ -43,12 +43,18 @@ public final class PostgreSQLIngestPositionManager
implements DialectIngestPosit
private static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
- @Override
- public WALPosition init(final DataSource dataSource, final String
slotNameSuffix) throws SQLException {
- try (Connection connection = dataSource.getConnection()) {
- createSlotIfNotExist(connection, getUniqueSlotName(connection,
slotNameSuffix));
- return getWalPosition(connection);
- }
+ /**
+ * Get the unique slot name by connection.
+ *
+ * @param connection the connection
+ * @param slotNameSuffix slot name suffix
+ * @return the unique name by connection
+ * @throws SQLException failed when getCatalog
+ */
+ public static String getUniqueSlotName(final Connection connection, final
String slotNameSuffix) throws SQLException {
+ // PostgreSQL slot name maximum length can't exceed 64,automatic
truncation when the length exceeds the limit
+ String slotName = DigestUtils.md5Hex(String.join("_",
connection.getCatalog(), slotNameSuffix).getBytes());
+ return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
}
@Override
@@ -56,13 +62,22 @@ public final class PostgreSQLIngestPositionManager
implements DialectIngestPosit
return new WALPosition(new
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(data)));
}
+ @Override
+ public WALPosition init(final DataSource dataSource, final String
slotNameSuffix) throws SQLException {
+ try (Connection connection = dataSource.getConnection()) {
+ createSlotIfNotExist(connection, getUniqueSlotName(connection,
slotNameSuffix));
+ return getWALPosition(connection);
+ }
+ }
+
private void createSlotIfNotExist(final Connection connection, final
String slotName) throws SQLException {
if (isSlotExisting(connection, slotName)) {
log.info("createSlotIfNotExist, slot exist, slotName={}",
slotName);
return;
}
- String createSlotSQL = String.format("SELECT * FROM
pg_create_logical_replication_slot('%s', '%s')", slotName, DECODE_PLUGIN);
- try (PreparedStatement preparedStatement =
connection.prepareStatement(createSlotSQL)) {
+ try (PreparedStatement preparedStatement =
connection.prepareStatement("SELECT * FROM
pg_create_logical_replication_slot(?, ?)")) {
+ preparedStatement.setString(1, slotName);
+ preparedStatement.setString(2, DECODE_PLUGIN);
preparedStatement.execute();
} catch (final SQLException ex) {
if (!DUPLICATE_OBJECT_ERROR_CODE.equals(ex.getSQLState())) {
@@ -82,7 +97,7 @@ public final class PostgreSQLIngestPositionManager implements
DialectIngestPosit
}
}
- private WALPosition getWalPosition(final Connection connection) throws
SQLException {
+ private WALPosition getWALPosition(final Connection connection) throws
SQLException {
try (
PreparedStatement preparedStatement =
connection.prepareStatement(getLogSequenceNumberSQL(connection));
ResultSet resultSet = preparedStatement.executeQuery()) {
@@ -122,20 +137,6 @@ public final class PostgreSQLIngestPositionManager
implements DialectIngestPosit
}
}
- /**
- * Get the unique slot name by connection.
- *
- * @param connection the connection
- * @param slotNameSuffix slot name suffix
- * @return the unique name by connection
- * @throws SQLException failed when getCatalog
- */
- public static String getUniqueSlotName(final Connection connection, final
String slotNameSuffix) throws SQLException {
- // PostgreSQL slot name maximum length can't exceed 64,automatic
truncation when the length exceeds the limit
- String slotName = DigestUtils.md5Hex(String.join("_",
connection.getCatalog(), slotNameSuffix).getBytes());
- return String.format("%s_%s", SLOT_NAME_PREFIX, slotName);
- }
-
@Override
public String getDatabaseType() {
return "PostgreSQL";
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/BaseLogSequenceNumber.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/BaseLogSequenceNumber.java
index 5a92412bc96..81409bd1965 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/BaseLogSequenceNumber.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/BaseLogSequenceNumber.java
@@ -18,21 +18,21 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal.decode;
/**
- * Base of log sequence number interface.
+ * Log sequence number.
*/
public interface BaseLogSequenceNumber {
/**
- * Convert log sequence number to String.
+ * Convert log sequence number to string.
*
- * @return Long the sequence number of String value
+ * @return converted string value
*/
String asString();
/**
- * Get the binded object.
+ * Get bound log sequence number.
*
- * @return Object the bind log sequence number
+ * @return bound log sequence number
*/
Object get();
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java
index 2b15f5d09da..5392ec97454 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/PostgreSQLLogSequenceNumber.java
@@ -22,7 +22,7 @@ import lombok.ToString;
import org.postgresql.replication.LogSequenceNumber;
/**
- * PostgreSQL sequence.
+ * Log sequence number of PostgreSQL.
*/
@RequiredArgsConstructor
@ToString
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManagerTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManagerTest.java
index b7e7122b7d8..045aa5b3025 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManagerTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLIngestPositionManagerTest.java
@@ -65,8 +65,7 @@ class PostgreSQLIngestPositionManagerTest {
when(connection.getCatalog()).thenReturn("sharding_db");
when(connection.getMetaData()).thenReturn(databaseMetaData);
PreparedStatement lsn96PreparedStatement = mockPostgreSQL96LSN();
- when(connection.prepareStatement(String.format("SELECT * FROM
pg_create_logical_replication_slot('%s', '%s')",
PostgreSQLIngestPositionManager.getUniqueSlotName(connection, ""),
- "test_decoding"))).thenReturn(mock(PreparedStatement.class));
+ when(connection.prepareStatement("SELECT * FROM
pg_create_logical_replication_slot(?,
?)")).thenReturn(mock(PreparedStatement.class));
when(connection.prepareStatement("SELECT
PG_CURRENT_XLOG_LOCATION()")).thenReturn(lsn96PreparedStatement);
PreparedStatement lsn10PreparedStatement = mockPostgreSQL10LSN();
when(connection.prepareStatement("SELECT
PG_CURRENT_WAL_LSN()")).thenReturn(lsn10PreparedStatement);