This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f08ce7ba8a [Fix][Connector-V2][Postgres-CDC] Fix PostgreSQL GEOMETRY 
handling with JDBC sink (#10186)
f08ce7ba8a is described below

commit f08ce7ba8a55863a6f110e07e5611523beb30fc7
Author: yzeng1618 <[email protected]>
AuthorDate: Thu Jan 1 08:34:08 2026 +0800

    [Fix][Connector-V2][Postgres-CDC] Fix PostgreSQL GEOMETRY handling with 
JDBC sink (#10186)
    
    Co-authored-by: zengyi <[email protected]>
---
 docs/en/connector-v2/sink/PostgreSql.md            |  4 +-
 docs/en/connector-v2/source/PostgreSQL-CDC.md      |  2 +-
 docs/zh/connector-v2/source/PostgreSQL-CDC.md      |  4 +-
 ...TunnelRowDebeziumDeserializationConverters.java | 29 +++++++
 ...elRowDebeziumDeserializationConvertersTest.java | 79 ++++++++++++++++++
 .../dialect/psql/PostgresJdbcRowConverter.java     | 43 ++++++++--
 .../dialect/psql/PostgresTypeConverter.java        |  4 +-
 .../dialect/psql/PostgresJdbcRowConverterTest.java | 95 ++++++++++++++++++++++
 .../src/test/resources/ddl/inventory.sql           | 24 ++++--
 9 files changed, 264 insertions(+), 20 deletions(-)

diff --git a/docs/en/connector-v2/sink/PostgreSql.md 
b/docs/en/connector-v2/sink/PostgreSql.md
index 6d7cfbaacf..290eb9517d 100644
--- a/docs/en/connector-v2/sink/PostgreSql.md
+++ b/docs/en/connector-v2/sink/PostgreSql.md
@@ -38,7 +38,7 @@ semantics (using XA transaction guarantee).
 | Datasource |                     Supported Versions                     |    
    Driver         |                  Url                  |                    
              Maven                                   |
 
|------------|------------------------------------------------------------|-----------------------|---------------------------------------|--------------------------------------------------------------------------|
 | PostgreSQL | Different dependency version has different driver class.   | 
org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | 
[Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) |
-| PostgreSQL | If you want to manipulate the GEOMETRY type in PostgreSQL. | 
org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | 
[Download](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc)  |
+| PostgreSQL | If you want to manipulate the GEOMETRY/GEOGRAPHY type in 
PostgreSQL. | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | 
[Download](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc)  |
 
 ## Database Dependency
 
@@ -275,4 +275,4 @@ sink {
 
 ## Changelog
 
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git a/docs/en/connector-v2/source/PostgreSQL-CDC.md 
b/docs/en/connector-v2/source/PostgreSQL-CDC.md
index 493a0c432e..db00d1c9d5 100644
--- a/docs/en/connector-v2/source/PostgreSQL-CDC.md
+++ b/docs/en/connector-v2/source/PostgreSQL-CDC.md
@@ -28,7 +28,7 @@ describes how to set up the Postgre CDC connector to run SQL 
queries against Pos
 | Datasource |                     Supported versions                     |    
    Driver         |                  Url                  |                    
              Maven                                   |
 
|------------|------------------------------------------------------------|-----------------------|---------------------------------------|--------------------------------------------------------------------------|
 | PostgreSQL | Different dependency version has different driver class.   | 
org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | 
[Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) |
-| PostgreSQL | If you want to manipulate the GEOMETRY type in PostgreSQL. | 
org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | 
[Download](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc)  |
+| PostgreSQL | If you want to manipulate the GEOMETRY/GEOGRAPHY type in 
PostgreSQL. | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | 
[Download](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc)  |
 
 ## Using Dependency
 
diff --git a/docs/zh/connector-v2/source/PostgreSQL-CDC.md 
b/docs/zh/connector-v2/source/PostgreSQL-CDC.md
index 66e4767ae7..84f37fa9f5 100644
--- a/docs/zh/connector-v2/source/PostgreSQL-CDC.md
+++ b/docs/zh/connector-v2/source/PostgreSQL-CDC.md
@@ -27,7 +27,7 @@ Postgre CDC 连接器允许从 Postgre 数据库读取快照数据和增量数
 | 数据源      |                     支持的版本                      |        驱动        
|                  Url                  |                                  
Maven                                   |
 
|------------|-----------------------------------------------------|---------------------|---------------------------------------|--------------------------------------------------------------------------|
 | PostgreSQL | 不同的依赖版本有不同的驱动类。                       | org.postgresql.Driver | 
jdbc:postgresql://localhost:5432/test | 
[下载](https://mvnrepository.com/artifact/org.postgresql/postgresql) |
-| PostgreSQL | 如果您想在 PostgreSQL 中操作 GEOMETRY 类型。        | 
org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | 
[下载](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc)  |
+| PostgreSQL | 如果您想在 PostgreSQL 中操作 GEOMETRY/GEOGRAPHY 类型。        | 
org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | 
[下载](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc)  |
 
 ## 使用依赖
 
@@ -190,4 +190,4 @@ source {
 
 ## 变更日志
 
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
index 227d2b7eee..b5982736d5 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
@@ -36,6 +36,8 @@ import org.apache.kafka.connect.source.SourceRecord;
 
 import io.debezium.data.SpecialValueDecimal;
 import io.debezium.data.VariableScaleDecimal;
+import io.debezium.data.geometry.Geography;
+import io.debezium.data.geometry.Geometry;
 import io.debezium.time.MicroTime;
 import io.debezium.time.MicroTimestamp;
 import io.debezium.time.NanoTime;
@@ -458,11 +460,38 @@ public class 
SeaTunnelRowDebeziumDeserializationConverters implements Serializab
 
             @Override
             public Object convert(Object dbzObj, Schema schema) {
+                if (dbzObj == null) {
+                    return null;
+                }
+
+                if (schema != null && schema.name() != null && dbzObj 
instanceof Struct) {
+                    String logicalName = schema.name();
+                    if (Geometry.LOGICAL_NAME.equals(logicalName)
+                            || Geography.LOGICAL_NAME.equals(logicalName)) {
+                        return convertGeometryStructToHexWkb((Struct) dbzObj);
+                    }
+                }
+
                 return dbzObj.toString();
             }
         };
     }
 
+    private static String convertGeometryStructToHexWkb(Struct struct) {
+        Object wkbField = struct.get(Geometry.WKB_FIELD);
+        if (!(wkbField instanceof byte[])) {
+            // Fallback to default string representation if the expected field 
is not present.
+            return struct.toString();
+        }
+
+        byte[] wkb = (byte[]) wkbField;
+        StringBuilder sb = new StringBuilder(wkb.length * 2);
+        for (byte b : wkb) {
+            sb.append(String.format("%02X", b));
+        }
+        return sb.toString();
+    }
+
     private static DebeziumDeserializationConverter convertToBinary() {
         return new DebeziumDeserializationConverter() {
             private static final long serialVersionUID = 1L;
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java
index 14098cecc9..c17243b2bd 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java
@@ -34,6 +34,9 @@ import org.apache.kafka.connect.source.SourceRecord;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import io.debezium.data.geometry.Geography;
+import io.debezium.data.geometry.Geometry;
+
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -127,4 +130,80 @@ public class 
SeaTunnelRowDebeziumDeserializationConvertersTest {
                 Arrays.equals(
                         doubles, (Double[]) 
(converter.convert(Arrays.asList(doubles), null))));
     }
+
+    @Test
+    void testGeometryStringConversion() throws Exception {
+        SeaTunnelRowDebeziumDeserializationConverters converters =
+                new SeaTunnelRowDebeziumDeserializationConverters(
+                        new SeaTunnelRowType(
+                                new String[] {"geo"},
+                                new SeaTunnelDataType[] 
{BasicType.STRING_TYPE}),
+                        new MetadataConverter[] {},
+                        ZoneId.systemDefault(),
+                        DebeziumDeserializationConverterFactory.DEFAULT);
+
+        byte[] wkb = new byte[] {0x01, 0x02, (byte) 0xFF};
+        Schema geometrySchema = Geometry.builder().optional().build();
+        Schema recordSchema = SchemaBuilder.struct().field("geo", 
geometrySchema).build();
+
+        Struct geometryValue = Geometry.createValue(geometrySchema, wkb, 4549);
+        Struct recordValue = new Struct(recordSchema);
+        recordValue.put("geo", geometryValue);
+
+        SourceRecord record =
+                new SourceRecord(
+                        new HashMap<>(),
+                        new HashMap<>(),
+                        "topicName",
+                        null,
+                        SchemaBuilder.int32().build(),
+                        1,
+                        recordSchema,
+                        recordValue,
+                        null,
+                        new ArrayList<>());
+
+        SeaTunnelRow row = converters.convert(record, recordValue, 
recordSchema);
+        Object fieldValue = row.getField(0);
+        Assertions.assertTrue(fieldValue instanceof String);
+        Assertions.assertEquals("0102FF", fieldValue);
+    }
+
+    @Test
+    void testGeographyStringConversion() throws Exception {
+        SeaTunnelRowDebeziumDeserializationConverters converters =
+                new SeaTunnelRowDebeziumDeserializationConverters(
+                        new SeaTunnelRowType(
+                                new String[] {"geo"},
+                                new SeaTunnelDataType[] 
{BasicType.STRING_TYPE}),
+                        new MetadataConverter[] {},
+                        ZoneId.systemDefault(),
+                        DebeziumDeserializationConverterFactory.DEFAULT);
+
+        byte[] wkb = new byte[] {0x01, 0x02, (byte) 0xFF};
+        Schema geographySchema = Geography.builder().optional().build();
+        Schema recordSchema = SchemaBuilder.struct().field("geo", 
geographySchema).build();
+
+        Struct geographyValue = Geometry.createValue(geographySchema, wkb, 
4549);
+        Struct recordValue = new Struct(recordSchema);
+        recordValue.put("geo", geographyValue);
+
+        SourceRecord record =
+                new SourceRecord(
+                        new HashMap<>(),
+                        new HashMap<>(),
+                        "topicName",
+                        null,
+                        SchemaBuilder.int32().build(),
+                        1,
+                        recordSchema,
+                        recordValue,
+                        null,
+                        new ArrayList<>());
+
+        SeaTunnelRow row = converters.convert(record, recordValue, 
recordSchema);
+        Object fieldValue = row.getField(0);
+        Assertions.assertTrue(fieldValue instanceof String);
+        Assertions.assertEquals("0102FF", fieldValue);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
index fd5f5d0d9c..a19c4814f1 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
@@ -58,6 +58,8 @@ import java.util.Locale;
 import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_CIDR;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_GEOGRAPHY;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_GEOMETRY;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_INET;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_INTERVAL;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_MAC_ADDR;
@@ -66,9 +68,6 @@ import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.ps
 @Slf4j
 public class PostgresJdbcRowConverter extends AbstractJdbcRowConverter {
 
-    private static final String PG_GEOMETRY = "GEOMETRY";
-    private static final String PG_GEOGRAPHY = "GEOGRAPHY";
-
     @Override
     public String converterName() {
         return DatabaseIdentifier.POSTGRESQL;
@@ -108,8 +107,8 @@ public class PostgresJdbcRowConverter extends 
AbstractJdbcRowConverter {
                     
rs.getMetaData().getColumnTypeName(resultSetIndex).toUpperCase(Locale.ROOT);
             switch (seaTunnelDataType.getSqlType()) {
                 case STRING:
-                    if (metaDataColumnType.equals(PG_GEOMETRY)
-                            || metaDataColumnType.equals(PG_GEOGRAPHY)) {
+                    if (PG_GEOMETRY.equalsIgnoreCase(metaDataColumnType)
+                            || 
PG_GEOGRAPHY.equalsIgnoreCase(metaDataColumnType)) {
                         Object geoObj = rs.getObject(resultSetIndex);
                         fields[fieldIndex] = geoObj == null ? null : 
geoObj.toString();
                     } else {
@@ -221,8 +220,18 @@ public class PostgresJdbcRowConverter extends 
AbstractJdbcRowConverter {
 
                 switch (seaTunnelDataType.getSqlType()) {
                     case STRING:
-                        String sourceType = sourceTypes[fieldIndex];
-                        if (PG_INET.equalsIgnoreCase(sourceType)
+                        String sourceType =
+                                resolveSourceType(
+                                        rowType, fieldIndex, 
databaseTableSchema, sourceTypes);
+                        if (sourceType != null
+                                && (PG_GEOMETRY.equalsIgnoreCase(sourceType)
+                                        || 
PG_GEOGRAPHY.equalsIgnoreCase(sourceType))) {
+                            // handle PostGIS geometry/geography when 
represented as string
+                            PGobject geometryObject = new PGobject();
+                            
geometryObject.setType(sourceType.toLowerCase(Locale.ROOT));
+                            geometryObject.setValue((String) 
row.getField(fieldIndex));
+                            statement.setObject(statementIndex, 
geometryObject);
+                        } else if (PG_INET.equalsIgnoreCase(sourceType)
                                 || PG_CIDR.equalsIgnoreCase(sourceType)
                                 || PG_MAC_ADDR.equalsIgnoreCase(sourceType)
                                 || PG_MAC_ADDR8.equalsIgnoreCase(sourceType)) {
@@ -290,7 +299,8 @@ public class PostgresJdbcRowConverter extends 
AbstractJdbcRowConverter {
                                 statement,
                                 seaTunnelDataType,
                                 statementIndex,
-                                sourceTypes[fieldIndex]);
+                                resolveSourceType(
+                                        rowType, fieldIndex, 
databaseTableSchema, sourceTypes));
                         break;
                     case BYTES:
                         statement.setBytes(statementIndex, (byte[]) 
row.getField(fieldIndex));
@@ -333,6 +343,23 @@ public class PostgresJdbcRowConverter extends 
AbstractJdbcRowConverter {
         return statement;
     }
 
+    @Nullable private String resolveSourceType(
+            SeaTunnelRowType rowType,
+            int fieldIndex,
+            @Nullable TableSchema databaseTableSchema,
+            String[] sourceTypes) {
+        if (databaseTableSchema != null) {
+            String fieldName = rowType.getFieldName(fieldIndex);
+            if (databaseTableSchema.contains(fieldName)) {
+                return 
databaseTableSchema.getColumn(fieldName).getSourceType();
+            }
+        }
+        if (fieldIndex < sourceTypes.length) {
+            return sourceTypes[fieldIndex];
+        }
+        return null;
+    }
+
     public String microsecondsToIntervalFormatVal(String intervalVal) {
         Duration duration = Duration.ofNanos(Long.parseLong(intervalVal) * 
1000);
         int days = (int) duration.toDays();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
index 418b894b21..588b2df183 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
@@ -95,8 +95,8 @@ public class PostgresTypeConverter implements 
TypeConverter<BasicTypeDefine> {
     public static final String PG_JSONB = "jsonb";
     public static final String PG_XML = "xml";
     public static final String PG_UUID = "uuid";
-    private static final String PG_GEOMETRY = "geometry";
-    private static final String PG_GEOGRAPHY = "geography";
+    public static final String PG_GEOMETRY = "geometry";
+    public static final String PG_GEOGRAPHY = "geography";
     public static final String PG_DATE = "date";
     public static final String PG_INTERVAL = "interval";
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverterTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverterTest.java
index d18986cecf..acbdc8122d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverterTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverterTest.java
@@ -28,8 +28,10 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
 import org.postgresql.util.PGobject;
 
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
@@ -38,7 +40,9 @@ import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class PostgresJdbcRowConverterTest {
@@ -185,4 +189,95 @@ public class PostgresJdbcRowConverterTest {
         Assertions.assertEquals(1, row.getField(0));
         Assertions.assertNull(row.getField(1), "geometry_col should be null");
     }
+
+    @Test
+    public void testToExternalWithGeometryType() throws SQLException {
+        TableSchema tableSchema =
+                createTableSchema("geometry_col", BasicType.STRING_TYPE, 
"geometry");
+
+        SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "0102FF"});
+        PreparedStatement statement = mock(PreparedStatement.class);
+
+        converter.toExternal(tableSchema, null, row, statement);
+
+        ArgumentCaptor<Object> captor = ArgumentCaptor.forClass(Object.class);
+        verify(statement).setObject(eq(2), captor.capture());
+
+        Object arg = captor.getValue();
+        Assertions.assertTrue(arg instanceof PGobject);
+        PGobject pg = (PGobject) arg;
+        Assertions.assertEquals("geometry", pg.getType());
+        Assertions.assertEquals("0102FF", pg.getValue());
+    }
+
+    @Test
+    public void testToExternalWithGeometryTypeFromDatabaseSchema() throws 
SQLException {
+        TableSchema writeSchema = createTableSchema("geometry_col", 
BasicType.STRING_TYPE, null);
+        TableSchema databaseSchema =
+                createTableSchema("geometry_col", BasicType.STRING_TYPE, 
"geometry");
+
+        SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "0102FF"});
+        PreparedStatement statement = mock(PreparedStatement.class);
+
+        converter.toExternal(writeSchema, databaseSchema, row, statement);
+
+        ArgumentCaptor<Object> captor = ArgumentCaptor.forClass(Object.class);
+        verify(statement).setObject(eq(2), captor.capture());
+
+        Object arg = captor.getValue();
+        Assertions.assertTrue(arg instanceof PGobject);
+        PGobject pg = (PGobject) arg;
+        Assertions.assertEquals("geometry", pg.getType());
+        Assertions.assertEquals("0102FF", pg.getValue());
+    }
+
+    @Test
+    public void testToInternalWithGeographyType() throws SQLException {
+        ResultSet rs = mock(ResultSet.class);
+        TableSchema tableSchema =
+                createTableSchema("geography_col", BasicType.STRING_TYPE, 
"GEOGRAPHY");
+
+        setupMockResultSet(rs, "INT4", "GEOGRAPHY", 1, "POINT(1 2)");
+
+        SeaTunnelRow row = converter.toInternal(rs, tableSchema);
+
+        Assertions.assertNotNull(row);
+        Assertions.assertEquals(1, row.getField(0));
+        Assertions.assertEquals("POINT(1 2)", row.getField(1));
+    }
+
+    @Test
+    public void testToInternalWithNullGeographyType() throws SQLException {
+        ResultSet rs = mock(ResultSet.class);
+        TableSchema tableSchema =
+                createTableSchema("geography_col", BasicType.STRING_TYPE, 
"GEOGRAPHY");
+
+        setupMockResultSet(rs, "INT4", "GEOGRAPHY", 1, null);
+
+        SeaTunnelRow row = converter.toInternal(rs, tableSchema);
+
+        Assertions.assertNotNull(row);
+        Assertions.assertEquals(1, row.getField(0));
+        Assertions.assertNull(row.getField(1), "geography_col should be null");
+    }
+
+    @Test
+    public void testToExternalWithGeographyType() throws SQLException {
+        TableSchema tableSchema =
+                createTableSchema("geography_col", BasicType.STRING_TYPE, 
"geography");
+
+        SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "0102FF"});
+        PreparedStatement statement = mock(PreparedStatement.class);
+
+        converter.toExternal(tableSchema, null, row, statement);
+
+        ArgumentCaptor<Object> captor = ArgumentCaptor.forClass(Object.class);
+        verify(statement).setObject(eq(2), captor.capture());
+
+        Object arg = captor.getValue();
+        Assertions.assertTrue(arg instanceof PGobject);
+        PGobject pg = (PGobject) arg;
+        Assertions.assertEquals("geography", pg.getType());
+        Assertions.assertEquals("0102FF", pg.getValue());
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
index c823ac5a3d..bf28399f25 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
@@ -49,6 +49,8 @@ CREATE TABLE postgres_cdc_table_1
     f_default_numeric   NUMERIC,
     f_numeric_no_scale  NUMERIC(24),
     f_inet              INET,
+    f_geometry          geometry(POINT, 4326),
+    f_geography         geography(POINT, 4326),
     PRIMARY KEY (id)
 );
 
@@ -101,6 +103,8 @@ CREATE TABLE sink_postgres_cdc_table_1
     f_default_numeric   NUMERIC,
     f_numeric_no_scale  NUMERIC(24),
     f_inet              INET,
+    f_geometry          geometry(POINT, 4326),
+    f_geography         geography(POINT, 4326),
     PRIMARY KEY (id)
 );
 
@@ -152,7 +156,9 @@ CREATE TABLE full_types_no_primary_key
     f_time              TIME(0),
     f_default_numeric   NUMERIC,
     f_numeric_no_scale  NUMERIC(24),
-    f_inet              INET
+    f_inet              INET,
+    f_geometry          geometry(POINT, 4326),
+    f_geography         geography(POINT, 4326)
 );
 
 CREATE TABLE full_types_no_primary_key_with_debezium
@@ -177,7 +183,9 @@ CREATE TABLE full_types_no_primary_key_with_debezium
     f_time              TIME(0),
     f_default_numeric   NUMERIC,
     f_numeric_no_scale  NUMERIC(24),
-    f_inet              INET
+    f_inet              INET,
+    f_geometry          geometry(POINT, 4326),
+    f_geography         geography(POINT, 4326)
 );
 
 CREATE TABLE postgres_cdc_table_3
@@ -272,7 +280,9 @@ ALTER TABLE full_types_no_primary_key_with_debezium
 INSERT INTO postgres_cdc_table_1
 VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
         'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', 
'2020-07-17 18:00:22.123456',
-        '2020-07-17', '18:00:22', 500,88,'192.168.1.1');
+        '2020-07-17', '18:00:22', 500,88,'192.168.1.1',
+        ST_GeomFromText('POINT(-122.3452 47.5925)', 4326),
+        ST_GeographyFromText('POINT(-122.3452 47.5925)'));
 
 INSERT INTO postgres_cdc_table_2
 VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
@@ -291,9 +301,13 @@ VALUES (1, '2', 32767, INTERVAL '1 day 2 hours', 
'192.168.1.100', '192.168.1.0/2
 INSERT INTO full_types_no_primary_key
 VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
         'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', 
'2020-07-17 18:00:22.123456',
-        '2020-07-17', '18:00:22', 500, 88,'192.168.1.1');
+        '2020-07-17', '18:00:22', 500, 88,'192.168.1.1',
+        ST_GeomFromText('POINT(-122.3452 47.5925)', 4326),
+        ST_GeographyFromText('POINT(-122.3452 47.5925)'));
 
 INSERT INTO full_types_no_primary_key_with_debezium
 VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
         'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', 
'2020-07-17 18:00:22.123456',
-        '2020-07-17', '18:00:22', 500, 88,'192.168.1.1');
\ No newline at end of file
+        '2020-07-17', '18:00:22', 500, 88,'192.168.1.1',
+        ST_GeomFromText('POINT(-122.3452 47.5925)', 4326),
+        ST_GeographyFromText('POINT(-122.3452 47.5925)'));

Reply via email to