This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f08ce7ba8a [Fix][Connector-V2][Postgres-CDC] Fix PostgreSQL GEOMETRY
handling with JDBC sink (#10186)
f08ce7ba8a is described below
commit f08ce7ba8a55863a6f110e07e5611523beb30fc7
Author: yzeng1618 <[email protected]>
AuthorDate: Thu Jan 1 08:34:08 2026 +0800
[Fix][Connector-V2][Postgres-CDC] Fix PostgreSQL GEOMETRY handling with
JDBC sink (#10186)
Co-authored-by: zengyi <[email protected]>
---
docs/en/connector-v2/sink/PostgreSql.md | 4 +-
docs/en/connector-v2/source/PostgreSQL-CDC.md | 2 +-
docs/zh/connector-v2/source/PostgreSQL-CDC.md | 4 +-
...TunnelRowDebeziumDeserializationConverters.java | 29 +++++++
...elRowDebeziumDeserializationConvertersTest.java | 79 ++++++++++++++++++
.../dialect/psql/PostgresJdbcRowConverter.java | 43 ++++++++--
.../dialect/psql/PostgresTypeConverter.java | 4 +-
.../dialect/psql/PostgresJdbcRowConverterTest.java | 95 ++++++++++++++++++++++
.../src/test/resources/ddl/inventory.sql | 24 ++++--
9 files changed, 264 insertions(+), 20 deletions(-)
diff --git a/docs/en/connector-v2/sink/PostgreSql.md
b/docs/en/connector-v2/sink/PostgreSql.md
index 6d7cfbaacf..290eb9517d 100644
--- a/docs/en/connector-v2/sink/PostgreSql.md
+++ b/docs/en/connector-v2/sink/PostgreSql.md
@@ -38,7 +38,7 @@ semantics (using XA transaction guarantee).
| Datasource | Supported Versions |
Driver | Url |
Maven |
|------------|------------------------------------------------------------|-----------------------|---------------------------------------|--------------------------------------------------------------------------|
| PostgreSQL | Different dependency version has different driver class. |
org.postgresql.Driver | jdbc:postgresql://localhost:5432/test |
[Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) |
-| PostgreSQL | If you want to manipulate the GEOMETRY type in PostgreSQL. |
org.postgresql.Driver | jdbc:postgresql://localhost:5432/test |
[Download](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc) |
+| PostgreSQL | If you want to manipulate the GEOMETRY/GEOGRAPHY type in
PostgreSQL. | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test |
[Download](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc) |
## Database Dependency
@@ -275,4 +275,4 @@ sink {
## Changelog
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git a/docs/en/connector-v2/source/PostgreSQL-CDC.md
b/docs/en/connector-v2/source/PostgreSQL-CDC.md
index 493a0c432e..db00d1c9d5 100644
--- a/docs/en/connector-v2/source/PostgreSQL-CDC.md
+++ b/docs/en/connector-v2/source/PostgreSQL-CDC.md
@@ -28,7 +28,7 @@ describes how to set up the Postgre CDC connector to run SQL
queries against Pos
| Datasource | Supported versions |
Driver | Url |
Maven |
|------------|------------------------------------------------------------|-----------------------|---------------------------------------|--------------------------------------------------------------------------|
| PostgreSQL | Different dependency version has different driver class. |
org.postgresql.Driver | jdbc:postgresql://localhost:5432/test |
[Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) |
-| PostgreSQL | If you want to manipulate the GEOMETRY type in PostgreSQL. |
org.postgresql.Driver | jdbc:postgresql://localhost:5432/test |
[Download](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc) |
+| PostgreSQL | If you want to manipulate the GEOMETRY/GEOGRAPHY type in
PostgreSQL. | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test |
[Download](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc) |
## Using Dependency
diff --git a/docs/zh/connector-v2/source/PostgreSQL-CDC.md
b/docs/zh/connector-v2/source/PostgreSQL-CDC.md
index 66e4767ae7..84f37fa9f5 100644
--- a/docs/zh/connector-v2/source/PostgreSQL-CDC.md
+++ b/docs/zh/connector-v2/source/PostgreSQL-CDC.md
@@ -27,7 +27,7 @@ Postgre CDC 连接器允许从 Postgre 数据库读取快照数据和增量数
| 数据源 | 支持的版本 | 驱动
| Url |
Maven |
|------------|-----------------------------------------------------|---------------------|---------------------------------------|--------------------------------------------------------------------------|
| PostgreSQL | 不同的依赖版本有不同的驱动类。 | org.postgresql.Driver |
jdbc:postgresql://localhost:5432/test |
[下载](https://mvnrepository.com/artifact/org.postgresql/postgresql) |
-| PostgreSQL | 如果您想在 PostgreSQL 中操作 GEOMETRY 类型。 |
org.postgresql.Driver | jdbc:postgresql://localhost:5432/test |
[下载](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc) |
+| PostgreSQL | 如果您想在 PostgreSQL 中操作 GEOMETRY/GEOGRAPHY 类型。 |
org.postgresql.Driver | jdbc:postgresql://localhost:5432/test |
[下载](https://mvnrepository.com/artifact/net.postgis/postgis-jdbc) |
## 使用依赖
@@ -190,4 +190,4 @@ source {
## 变更日志
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
index 227d2b7eee..b5982736d5 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java
@@ -36,6 +36,8 @@ import org.apache.kafka.connect.source.SourceRecord;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
+import io.debezium.data.geometry.Geography;
+import io.debezium.data.geometry.Geometry;
import io.debezium.time.MicroTime;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.NanoTime;
@@ -458,11 +460,38 @@ public class
SeaTunnelRowDebeziumDeserializationConverters implements Serializab
@Override
public Object convert(Object dbzObj, Schema schema) {
+ if (dbzObj == null) {
+ return null;
+ }
+
+ if (schema != null && schema.name() != null && dbzObj
instanceof Struct) {
+ String logicalName = schema.name();
+ if (Geometry.LOGICAL_NAME.equals(logicalName)
+ || Geography.LOGICAL_NAME.equals(logicalName)) {
+ return convertGeometryStructToHexWkb((Struct) dbzObj);
+ }
+ }
+
return dbzObj.toString();
}
};
}
+ private static String convertGeometryStructToHexWkb(Struct struct) {
+ Object wkbField = struct.get(Geometry.WKB_FIELD);
+ if (!(wkbField instanceof byte[])) {
+ // Fallback to default string representation if the expected field
is not present.
+ return struct.toString();
+ }
+
+ byte[] wkb = (byte[]) wkbField;
+ StringBuilder sb = new StringBuilder(wkb.length * 2);
+ for (byte b : wkb) {
+ sb.append(String.format("%02X", b));
+ }
+ return sb.toString();
+ }
+
private static DebeziumDeserializationConverter convertToBinary() {
return new DebeziumDeserializationConverter() {
private static final long serialVersionUID = 1L;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java
index 14098cecc9..c17243b2bd 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConvertersTest.java
@@ -34,6 +34,9 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import io.debezium.data.geometry.Geography;
+import io.debezium.data.geometry.Geometry;
+
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
@@ -127,4 +130,80 @@ public class
SeaTunnelRowDebeziumDeserializationConvertersTest {
Arrays.equals(
doubles, (Double[])
(converter.convert(Arrays.asList(doubles), null))));
}
+
+ @Test
+ void testGeometryStringConversion() throws Exception {
+ SeaTunnelRowDebeziumDeserializationConverters converters =
+ new SeaTunnelRowDebeziumDeserializationConverters(
+ new SeaTunnelRowType(
+ new String[] {"geo"},
+ new SeaTunnelDataType[]
{BasicType.STRING_TYPE}),
+ new MetadataConverter[] {},
+ ZoneId.systemDefault(),
+ DebeziumDeserializationConverterFactory.DEFAULT);
+
+ byte[] wkb = new byte[] {0x01, 0x02, (byte) 0xFF};
+ Schema geometrySchema = Geometry.builder().optional().build();
+ Schema recordSchema = SchemaBuilder.struct().field("geo",
geometrySchema).build();
+
+ Struct geometryValue = Geometry.createValue(geometrySchema, wkb, 4549);
+ Struct recordValue = new Struct(recordSchema);
+ recordValue.put("geo", geometryValue);
+
+ SourceRecord record =
+ new SourceRecord(
+ new HashMap<>(),
+ new HashMap<>(),
+ "topicName",
+ null,
+ SchemaBuilder.int32().build(),
+ 1,
+ recordSchema,
+ recordValue,
+ null,
+ new ArrayList<>());
+
+ SeaTunnelRow row = converters.convert(record, recordValue,
recordSchema);
+ Object fieldValue = row.getField(0);
+ Assertions.assertTrue(fieldValue instanceof String);
+ Assertions.assertEquals("0102FF", fieldValue);
+ }
+
+ @Test
+ void testGeographyStringConversion() throws Exception {
+ SeaTunnelRowDebeziumDeserializationConverters converters =
+ new SeaTunnelRowDebeziumDeserializationConverters(
+ new SeaTunnelRowType(
+ new String[] {"geo"},
+ new SeaTunnelDataType[]
{BasicType.STRING_TYPE}),
+ new MetadataConverter[] {},
+ ZoneId.systemDefault(),
+ DebeziumDeserializationConverterFactory.DEFAULT);
+
+ byte[] wkb = new byte[] {0x01, 0x02, (byte) 0xFF};
+ Schema geographySchema = Geography.builder().optional().build();
+ Schema recordSchema = SchemaBuilder.struct().field("geo",
geographySchema).build();
+
+ Struct geographyValue = Geometry.createValue(geographySchema, wkb,
4549);
+ Struct recordValue = new Struct(recordSchema);
+ recordValue.put("geo", geographyValue);
+
+ SourceRecord record =
+ new SourceRecord(
+ new HashMap<>(),
+ new HashMap<>(),
+ "topicName",
+ null,
+ SchemaBuilder.int32().build(),
+ 1,
+ recordSchema,
+ recordValue,
+ null,
+ new ArrayList<>());
+
+ SeaTunnelRow row = converters.convert(record, recordValue,
recordSchema);
+ Object fieldValue = row.getField(0);
+ Assertions.assertTrue(fieldValue instanceof String);
+ Assertions.assertEquals("0102FF", fieldValue);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
index fd5f5d0d9c..a19c4814f1 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
@@ -58,6 +58,8 @@ import java.util.Locale;
import java.util.Optional;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_CIDR;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_GEOGRAPHY;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_GEOMETRY;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_INET;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_INTERVAL;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_MAC_ADDR;
@@ -66,9 +68,6 @@ import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.ps
@Slf4j
public class PostgresJdbcRowConverter extends AbstractJdbcRowConverter {
- private static final String PG_GEOMETRY = "GEOMETRY";
- private static final String PG_GEOGRAPHY = "GEOGRAPHY";
-
@Override
public String converterName() {
return DatabaseIdentifier.POSTGRESQL;
@@ -108,8 +107,8 @@ public class PostgresJdbcRowConverter extends
AbstractJdbcRowConverter {
rs.getMetaData().getColumnTypeName(resultSetIndex).toUpperCase(Locale.ROOT);
switch (seaTunnelDataType.getSqlType()) {
case STRING:
- if (metaDataColumnType.equals(PG_GEOMETRY)
- || metaDataColumnType.equals(PG_GEOGRAPHY)) {
+ if (PG_GEOMETRY.equalsIgnoreCase(metaDataColumnType)
+ ||
PG_GEOGRAPHY.equalsIgnoreCase(metaDataColumnType)) {
Object geoObj = rs.getObject(resultSetIndex);
fields[fieldIndex] = geoObj == null ? null :
geoObj.toString();
} else {
@@ -221,8 +220,18 @@ public class PostgresJdbcRowConverter extends
AbstractJdbcRowConverter {
switch (seaTunnelDataType.getSqlType()) {
case STRING:
- String sourceType = sourceTypes[fieldIndex];
- if (PG_INET.equalsIgnoreCase(sourceType)
+ String sourceType =
+ resolveSourceType(
+ rowType, fieldIndex,
databaseTableSchema, sourceTypes);
+ if (sourceType != null
+ && (PG_GEOMETRY.equalsIgnoreCase(sourceType)
+ ||
PG_GEOGRAPHY.equalsIgnoreCase(sourceType))) {
+ // handle PostGIS geometry/geography when
represented as string
+ PGobject geometryObject = new PGobject();
+
geometryObject.setType(sourceType.toLowerCase(Locale.ROOT));
+ geometryObject.setValue((String)
row.getField(fieldIndex));
+ statement.setObject(statementIndex,
geometryObject);
+ } else if (PG_INET.equalsIgnoreCase(sourceType)
|| PG_CIDR.equalsIgnoreCase(sourceType)
|| PG_MAC_ADDR.equalsIgnoreCase(sourceType)
|| PG_MAC_ADDR8.equalsIgnoreCase(sourceType)) {
@@ -290,7 +299,8 @@ public class PostgresJdbcRowConverter extends
AbstractJdbcRowConverter {
statement,
seaTunnelDataType,
statementIndex,
- sourceTypes[fieldIndex]);
+ resolveSourceType(
+ rowType, fieldIndex,
databaseTableSchema, sourceTypes));
break;
case BYTES:
statement.setBytes(statementIndex, (byte[])
row.getField(fieldIndex));
@@ -333,6 +343,23 @@ public class PostgresJdbcRowConverter extends
AbstractJdbcRowConverter {
return statement;
}
+ @Nullable private String resolveSourceType(
+ SeaTunnelRowType rowType,
+ int fieldIndex,
+ @Nullable TableSchema databaseTableSchema,
+ String[] sourceTypes) {
+ if (databaseTableSchema != null) {
+ String fieldName = rowType.getFieldName(fieldIndex);
+ if (databaseTableSchema.contains(fieldName)) {
+ return
databaseTableSchema.getColumn(fieldName).getSourceType();
+ }
+ }
+ if (fieldIndex < sourceTypes.length) {
+ return sourceTypes[fieldIndex];
+ }
+ return null;
+ }
+
public String microsecondsToIntervalFormatVal(String intervalVal) {
Duration duration = Duration.ofNanos(Long.parseLong(intervalVal) *
1000);
int days = (int) duration.toDays();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
index 418b894b21..588b2df183 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
@@ -95,8 +95,8 @@ public class PostgresTypeConverter implements
TypeConverter<BasicTypeDefine> {
public static final String PG_JSONB = "jsonb";
public static final String PG_XML = "xml";
public static final String PG_UUID = "uuid";
- private static final String PG_GEOMETRY = "geometry";
- private static final String PG_GEOGRAPHY = "geography";
+ public static final String PG_GEOMETRY = "geometry";
+ public static final String PG_GEOGRAPHY = "geography";
public static final String PG_DATE = "date";
public static final String PG_INTERVAL = "interval";
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverterTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverterTest.java
index d18986cecf..acbdc8122d 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverterTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverterTest.java
@@ -28,8 +28,10 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
import org.postgresql.util.PGobject;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
@@ -38,7 +40,9 @@ import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
+import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class PostgresJdbcRowConverterTest {
@@ -185,4 +189,95 @@ public class PostgresJdbcRowConverterTest {
Assertions.assertEquals(1, row.getField(0));
Assertions.assertNull(row.getField(1), "geometry_col should be null");
}
+
+ @Test
+ public void testToExternalWithGeometryType() throws SQLException {
+ TableSchema tableSchema =
+ createTableSchema("geometry_col", BasicType.STRING_TYPE,
"geometry");
+
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "0102FF"});
+ PreparedStatement statement = mock(PreparedStatement.class);
+
+ converter.toExternal(tableSchema, null, row, statement);
+
+ ArgumentCaptor<Object> captor = ArgumentCaptor.forClass(Object.class);
+ verify(statement).setObject(eq(2), captor.capture());
+
+ Object arg = captor.getValue();
+ Assertions.assertTrue(arg instanceof PGobject);
+ PGobject pg = (PGobject) arg;
+ Assertions.assertEquals("geometry", pg.getType());
+ Assertions.assertEquals("0102FF", pg.getValue());
+ }
+
+ @Test
+ public void testToExternalWithGeometryTypeFromDatabaseSchema() throws
SQLException {
+ TableSchema writeSchema = createTableSchema("geometry_col",
BasicType.STRING_TYPE, null);
+ TableSchema databaseSchema =
+ createTableSchema("geometry_col", BasicType.STRING_TYPE,
"geometry");
+
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "0102FF"});
+ PreparedStatement statement = mock(PreparedStatement.class);
+
+ converter.toExternal(writeSchema, databaseSchema, row, statement);
+
+ ArgumentCaptor<Object> captor = ArgumentCaptor.forClass(Object.class);
+ verify(statement).setObject(eq(2), captor.capture());
+
+ Object arg = captor.getValue();
+ Assertions.assertTrue(arg instanceof PGobject);
+ PGobject pg = (PGobject) arg;
+ Assertions.assertEquals("geometry", pg.getType());
+ Assertions.assertEquals("0102FF", pg.getValue());
+ }
+
+ @Test
+ public void testToInternalWithGeographyType() throws SQLException {
+ ResultSet rs = mock(ResultSet.class);
+ TableSchema tableSchema =
+ createTableSchema("geography_col", BasicType.STRING_TYPE,
"GEOGRAPHY");
+
+ setupMockResultSet(rs, "INT4", "GEOGRAPHY", 1, "POINT(1 2)");
+
+ SeaTunnelRow row = converter.toInternal(rs, tableSchema);
+
+ Assertions.assertNotNull(row);
+ Assertions.assertEquals(1, row.getField(0));
+ Assertions.assertEquals("POINT(1 2)", row.getField(1));
+ }
+
+ @Test
+ public void testToInternalWithNullGeographyType() throws SQLException {
+ ResultSet rs = mock(ResultSet.class);
+ TableSchema tableSchema =
+ createTableSchema("geography_col", BasicType.STRING_TYPE,
"GEOGRAPHY");
+
+ setupMockResultSet(rs, "INT4", "GEOGRAPHY", 1, null);
+
+ SeaTunnelRow row = converter.toInternal(rs, tableSchema);
+
+ Assertions.assertNotNull(row);
+ Assertions.assertEquals(1, row.getField(0));
+ Assertions.assertNull(row.getField(1), "geography_col should be null");
+ }
+
+ @Test
+ public void testToExternalWithGeographyType() throws SQLException {
+ TableSchema tableSchema =
+ createTableSchema("geography_col", BasicType.STRING_TYPE,
"geography");
+
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {1, "0102FF"});
+ PreparedStatement statement = mock(PreparedStatement.class);
+
+ converter.toExternal(tableSchema, null, row, statement);
+
+ ArgumentCaptor<Object> captor = ArgumentCaptor.forClass(Object.class);
+ verify(statement).setObject(eq(2), captor.capture());
+
+ Object arg = captor.getValue();
+ Assertions.assertTrue(arg instanceof PGobject);
+ PGobject pg = (PGobject) arg;
+ Assertions.assertEquals("geography", pg.getType());
+ Assertions.assertEquals("0102FF", pg.getValue());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
index c823ac5a3d..bf28399f25 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
@@ -49,6 +49,8 @@ CREATE TABLE postgres_cdc_table_1
f_default_numeric NUMERIC,
f_numeric_no_scale NUMERIC(24),
f_inet INET,
+ f_geometry geometry(POINT, 4326),
+ f_geography geography(POINT, 4326),
PRIMARY KEY (id)
);
@@ -101,6 +103,8 @@ CREATE TABLE sink_postgres_cdc_table_1
f_default_numeric NUMERIC,
f_numeric_no_scale NUMERIC(24),
f_inet INET,
+ f_geometry geometry(POINT, 4326),
+ f_geography geography(POINT, 4326),
PRIMARY KEY (id)
);
@@ -152,7 +156,9 @@ CREATE TABLE full_types_no_primary_key
f_time TIME(0),
f_default_numeric NUMERIC,
f_numeric_no_scale NUMERIC(24),
- f_inet INET
+ f_inet INET,
+ f_geometry geometry(POINT, 4326),
+ f_geography geography(POINT, 4326)
);
CREATE TABLE full_types_no_primary_key_with_debezium
@@ -177,7 +183,9 @@ CREATE TABLE full_types_no_primary_key_with_debezium
f_time TIME(0),
f_default_numeric NUMERIC,
f_numeric_no_scale NUMERIC(24),
- f_inet INET
+ f_inet INET,
+ f_geometry geometry(POINT, 4326),
+ f_geography geography(POINT, 4326)
);
CREATE TABLE postgres_cdc_table_3
@@ -272,7 +280,9 @@ ALTER TABLE full_types_no_primary_key_with_debezium
INSERT INTO postgres_cdc_table_1
VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123',
'2020-07-17 18:00:22.123456',
- '2020-07-17', '18:00:22', 500,88,'192.168.1.1');
+ '2020-07-17', '18:00:22', 500,88,'192.168.1.1',
+ ST_GeomFromText('POINT(-122.3452 47.5925)', 4326),
+ ST_GeographyFromText('POINT(-122.3452 47.5925)'));
INSERT INTO postgres_cdc_table_2
VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
@@ -291,9 +301,13 @@ VALUES (1, '2', 32767, INTERVAL '1 day 2 hours',
'192.168.1.100', '192.168.1.0/2
INSERT INTO full_types_no_primary_key
VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123',
'2020-07-17 18:00:22.123456',
- '2020-07-17', '18:00:22', 500, 88,'192.168.1.1');
+ '2020-07-17', '18:00:22', 500, 88,'192.168.1.1',
+ ST_GeomFromText('POINT(-122.3452 47.5925)', 4326),
+ ST_GeographyFromText('POINT(-122.3452 47.5925)'));
INSERT INTO full_types_no_primary_key_with_debezium
VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123',
'2020-07-17 18:00:22.123456',
- '2020-07-17', '18:00:22', 500, 88,'192.168.1.1');
\ No newline at end of file
+ '2020-07-17', '18:00:22', 500, 88,'192.168.1.1',
+ ST_GeomFromText('POINT(-122.3452 47.5925)', 4326),
+ ST_GeographyFromText('POINT(-122.3452 47.5925)'));