This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 30b0a08495 [Feature][Connector-V2] Postgresql support read
TIMESTAMP_TZ (#10048)
30b0a08495 is described below
commit 30b0a0849555e2ac1bb489a19348a93867e99e7e
Author: yzeng1618 <[email protected]>
AuthorDate: Sat Nov 29 11:09:33 2025 +0800
[Feature][Connector-V2] Postgresql support read TIMESTAMP_TZ (#10048)
Co-authored-by: zengyi <[email protected]>
---
.../apache/seatunnel/api/table/type/BasicType.java | 3 +
.../converter/AbstractJdbcRowConverter.java | 14 +-
.../dialect/psql/PostgresJdbcRowConverter.java | 151 ++++++++++++++++-
.../dialect/psql/PostgresTypeConverter.java | 23 ++-
.../CopyManagerBatchStatementExecutor.java | 10 ++
.../seatunnel/jdbc/utils/JdbcFieldTypeUtils.java | 121 ++++++++++++-
.../kingbase/KingbaseTypeConverterTest.java | 4 +-
.../dialect/psql/PostgresJdbcRowConverterTest.java | 188 +++++++++++++++++++++
.../dialect/psql/PostgresTypeConverterTest.java | 74 +++++++-
.../jdbc/utils/JdbcFieldTypeUtilsTest.java | 87 ++++++++++
.../connectors/seatunnel/jdbc/JdbcPostgresIT.java | 21 ++-
.../resources/jdbc_postgres_source_and_sink.conf | 4 +-
.../jdbc_postgres_source_and_sink_copy_stmt.conf | 4 +-
.../jdbc_postgres_source_and_sink_parallel.conf | 8 +-
...tgres_source_and_sink_parallel_upper_lower.conf | 8 +-
.../jdbc_postgres_source_and_sink_xa.conf | 8 +-
16 files changed, 697 insertions(+), 31 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
index 67f24ef412..1c05672947 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.api.table.type;
+import java.time.OffsetDateTime;
import java.util.Objects;
public class BasicType<T> implements SeaTunnelDataType<T> {
@@ -35,6 +36,8 @@ public class BasicType<T> implements SeaTunnelDataType<T> {
public static final BasicType<Double> DOUBLE_TYPE =
new BasicType<>(Double.class, SqlType.DOUBLE);
public static final BasicType<Void> VOID_TYPE = new
BasicType<>(Void.class, SqlType.NULL);
+ public static final LocalTimeType<OffsetDateTime> OFFSET_DATE_TIME_TYPE =
+ LocalTimeType.OFFSET_DATE_TIME_TYPE;
//
--------------------------------------------------------------------------------------------
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
index d5c3aa95ac..c97eeeec04 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
@@ -120,6 +120,11 @@ public abstract class AbstractJdbcRowConverter implements
JdbcRowConverter {
.map(e -> e.toLocalDateTime())
.orElse(null);
break;
+ case TIMESTAMP_TZ:
+ OffsetDateTime offsetDateTime =
+ JdbcFieldTypeUtils.getOffsetDateTime(rs,
resultSetIndex);
+ fields[fieldIndex] = offsetDateTime;
+ break;
case BYTES:
fields[fieldIndex] = JdbcFieldTypeUtils.getBytes(rs,
resultSetIndex);
break;
@@ -278,7 +283,14 @@ public abstract class AbstractJdbcRowConverter implements
JdbcRowConverter {
break;
case TIMESTAMP_TZ:
OffsetDateTime offsetDateTime = (OffsetDateTime) value;
- statement.setTimestamp(statementIndex,
Timestamp.from(offsetDateTime.toInstant()));
+ try {
+ // Try to use setObject first for better timezone support
+ statement.setObject(statementIndex, offsetDateTime);
+ } catch (SQLException e) {
+ // Fallback to setTimestamp if setObject is not supported
+ statement.setTimestamp(
+ statementIndex,
Timestamp.from(offsetDateTime.toInstant()));
+ }
break;
case BYTES:
statement.setBytes(statementIndex, (byte[]) value);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
index aef78499ce..fd5f5d0d9c 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql;
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.shade.org.apache.commons.lang3.math.NumberUtils;
import org.apache.seatunnel.api.table.catalog.Column;
@@ -35,6 +36,8 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
import org.postgresql.util.PGobject;
+import lombok.extern.slf4j.Slf4j;
+
import javax.annotation.Nullable;
import java.math.BigDecimal;
@@ -50,6 +53,7 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
import java.util.Locale;
import java.util.Optional;
@@ -59,6 +63,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.ps
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_MAC_ADDR;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_MAC_ADDR8;
+@Slf4j
public class PostgresJdbcRowConverter extends AbstractJdbcRowConverter {
private static final String PG_GEOMETRY = "GEOMETRY";
@@ -69,6 +74,29 @@ public class PostgresJdbcRowConverter extends
AbstractJdbcRowConverter {
return DatabaseIdentifier.POSTGRESQL;
}
+ @Override
+ protected void setValueToStatementByDataType(
+ Object value,
+ PreparedStatement statement,
+ SeaTunnelDataType<?> seaTunnelDataType,
+ int statementIndex,
+ @Nullable String sourceType)
+ throws SQLException {
+ if (seaTunnelDataType.getSqlType().equals(SqlType.TIMESTAMP_TZ)) {
+ if (value == null) {
+ statement.setNull(statementIndex,
java.sql.Types.TIMESTAMP_WITH_TIMEZONE);
+ } else {
+ PGobject timestampTzObject = new PGobject();
+ timestampTzObject.setType("timestamptz");
+ timestampTzObject.setValue(((OffsetDateTime)
value).toString());
+ statement.setObject(statementIndex, timestampTzObject);
+ }
+ return;
+ }
+ super.setValueToStatementByDataType(
+ value, statement, seaTunnelDataType, statementIndex,
sourceType);
+ }
+
@Override
public SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema)
throws SQLException {
SeaTunnelRowType typeInfo = tableSchema.toPhysicalRowDataType();
@@ -82,10 +110,8 @@ public class PostgresJdbcRowConverter extends
AbstractJdbcRowConverter {
case STRING:
if (metaDataColumnType.equals(PG_GEOMETRY)
|| metaDataColumnType.equals(PG_GEOGRAPHY)) {
- fields[fieldIndex] =
- rs.getObject(resultSetIndex) == null
- ? null
- :
rs.getObject(resultSetIndex).toString();
+ Object geoObj = rs.getObject(resultSetIndex);
+ fields[fieldIndex] = geoObj == null ? null :
geoObj.toString();
} else {
fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs,
resultSetIndex);
}
@@ -131,6 +157,10 @@ public class PostgresJdbcRowConverter extends
AbstractJdbcRowConverter {
.map(e -> e.toLocalDateTime())
.orElse(null);
break;
+ case TIMESTAMP_TZ:
+ // Enhanced PostgreSQL TIMESTAMP_TZ handling
+ fields[fieldIndex] = getPostgresOffsetDateTime(rs,
resultSetIndex);
+ break;
case BYTES:
fields[fieldIndex] = JdbcFieldTypeUtils.getBytes(rs,
resultSetIndex);
break;
@@ -255,9 +285,12 @@ public class PostgresJdbcRowConverter extends
AbstractJdbcRowConverter {
statementIndex,
java.sql.Timestamp.valueOf(localDateTime));
break;
case TIMESTAMP_TZ:
- OffsetDateTime offsetDateTime = (OffsetDateTime)
row.getField(fieldIndex);
- statement.setTimestamp(
- statementIndex,
Timestamp.from(offsetDateTime.toInstant()));
+ setValueToStatementByDataType(
+ row.getField(fieldIndex),
+ statement,
+ seaTunnelDataType,
+ statementIndex,
+ sourceTypes[fieldIndex]);
break;
case BYTES:
statement.setBytes(statementIndex, (byte[])
row.getField(fieldIndex));
@@ -316,4 +349,108 @@ public class PostgresJdbcRowConverter extends
AbstractJdbcRowConverter {
if (seconds > 0) sb.append(seconds).append(" seconds");
return sb.toString().trim();
}
+
+ private OffsetDateTime getPostgresOffsetDateTime(ResultSet rs, int
columnIndex)
+ throws SQLException {
+ // Read the value once to avoid drivers returning null on subsequent
reads
+ final Object obj = rs.getObject(columnIndex);
+
+ if (obj == null) {
+ return null;
+ }
+
+ // Direct types
+ if (obj instanceof OffsetDateTime) {
+ return (OffsetDateTime) obj;
+ }
+ if (obj instanceof Timestamp) {
+ return ((Timestamp) obj).toInstant().atOffset(ZoneOffset.UTC);
+ }
+ if (obj instanceof java.time.ZonedDateTime) {
+ return ((java.time.ZonedDateTime) obj).toOffsetDateTime();
+ }
+ if (obj instanceof java.util.Date) {
+ return ((java.util.Date) obj).toInstant().atOffset(ZoneOffset.UTC);
+ }
+
+ // Remaining PostgreSQL-specific or driver types: fall back to string
representation
+ return parseTimestampFromObjectString(obj);
+ }
+
+ private OffsetDateTime parsePostgresTimestampTz(String str) throws
SQLException {
+ String normalized = normalizeIsoTimestamp(str);
+ if (normalized == null) {
+ return null;
+ }
+
+ try {
+ return OffsetDateTime.parse(normalized);
+ } catch (Exception primary) {
+ log.debug("Failed to parse PostgreSQL timestamptz as ISO-8601:
{}", str, primary);
+ try {
+ String withoutOffset =
+
normalized.replaceFirst("([+-]\\d{2}:?\\d{2}|\\s+UTC|[zZ])$", "");
+ String fallback = withoutOffset.replace('T', ' ').trim();
+ Timestamp ts = Timestamp.valueOf(fallback);
+ return ts.toInstant().atOffset(ZoneOffset.UTC);
+ } catch (Exception secondary) {
+ log.debug(
+ "Failed to parse PostgreSQL timestamptz as UTC
timestamp: {}",
+ str,
+ secondary);
+ throw new SQLException(
+ "Failed to parse PostgreSQL timestamptz string: " +
str, secondary);
+ }
+ }
+ }
+
+ @Nullable private OffsetDateTime parseTimestampFromObjectString(Object
obj) throws SQLException {
+ final String str;
+ try {
+ str = String.valueOf(obj);
+ } catch (Throwable e) {
+ log.debug(
+ "Failed to get PostgreSQL timestamp object string
representation from class: {}",
+ obj.getClass().getName(),
+ e);
+ return null;
+ }
+ return parsePostgresTimestampTz(str);
+ }
+
+ private String normalizeIsoTimestamp(String value) {
+ // PostgreSQL timestamptz format examples:
+ // "2023-12-25 10:30:45.123456+08:00"
+ // "2023-12-25 10:30:45+08"
+ // "2023-12-25 10:30:45.123456 UTC"
+ String normalized = StringUtils.trimToNull(value);
+ if (normalized == null) {
+ return null;
+ }
+ // Handle UTC timezone
+ if (normalized.endsWith(" UTC")) {
+ normalized = normalized.substring(0, normalized.length() - 4) +
"Z";
+ }
+ // Normalize to ISO-8601 format examples:
+ // "2024-01-01T10:15:30+08:00"
+ // "2024-01-01T10:15:30Z"
+ normalized = normalized.replace(' ', 'T');
+ if (!normalized.isEmpty()) {
+ char lastChar = normalized.charAt(normalized.length() - 1);
+ if (lastChar == 'z' || lastChar == 'Z') {
+ normalized = normalized.substring(0, normalized.length() - 1)
+ "Z";
+ }
+ }
+ // Add colon to offsets like +HH -> +HH:00
+ if (normalized.matches(".*[+-]\\d{2}$")) {
+ return normalized + ":00";
+ }
+ if (normalized.matches(".*[+-]\\d{4}$")) {
+ // Add colon to offsets like +HHMM -> +HH:MM
+ return normalized.substring(0, normalized.length() - 2)
+ + ":"
+ + normalized.substring(normalized.length() - 2);
+ }
+ return normalized;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
index f472d3bce5..418b894b21 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
@@ -262,7 +262,6 @@ public class PostgresTypeConverter implements
TypeConverter<BasicTypeDefine> {
}
break;
case PG_TIMESTAMP:
- case PG_TIMESTAMP_TZ:
builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
if (typeDefine.getScale() != null && typeDefine.getScale() >
MAX_TIMESTAMP_SCALE) {
builder.scale(MAX_TIMESTAMP_SCALE);
@@ -274,6 +273,15 @@ public class PostgresTypeConverter implements
TypeConverter<BasicTypeDefine> {
builder.scale(typeDefine.getScale());
}
break;
+ case PG_TIMESTAMP_TZ:
+ // timestamptz -> TIMESTAMP_TZ
+ builder.dataType(LocalTimeType.OFFSET_DATE_TIME_TYPE);
+ if (typeDefine.getScale() != null && typeDefine.getScale() >
MAX_TIMESTAMP_SCALE) {
+ builder.scale(MAX_TIMESTAMP_SCALE);
+ } else {
+ builder.scale(typeDefine.getScale());
+ }
+ break;
default:
throw CommonError.convertToSeaTunnelTypeError(
identifier(), typeDefine.getDataType(),
typeDefine.getName());
@@ -444,6 +452,19 @@ public class PostgresTypeConverter implements
TypeConverter<BasicTypeDefine> {
builder.dataType(PG_TIMESTAMP);
builder.scale(timestampScale);
break;
+ case TIMESTAMP_TZ:
+ Integer timestampTzScale = column.getScale();
+ if (timestampTzScale != null && timestampTzScale >
MAX_TIMESTAMP_SCALE) {
+ timestampTzScale = MAX_TIMESTAMP_SCALE;
+ }
+ String timestampTzColumnType =
+ (timestampTzScale != null && timestampTzScale > 0)
+ ? String.format("%s(%s)", PG_TIMESTAMP_TZ,
timestampTzScale)
+ : PG_TIMESTAMP_TZ;
+ builder.columnType(timestampTzColumnType);
+ builder.dataType(PG_TIMESTAMP_TZ);
+ builder.scale(timestampTzScale);
+ break;
case ARRAY:
ArrayType arrayType = (ArrayType) column.getDataType();
SeaTunnelDataType elementType = arrayType.getElementType();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerBatchStatementExecutor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerBatchStatementExecutor.java
index b485d39de1..45f5b0996b 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerBatchStatementExecutor.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerBatchStatementExecutor.java
@@ -37,6 +37,7 @@ import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;
@@ -145,6 +146,15 @@ public class CopyManagerBatchStatementExecutor implements
JdbcBatchStatementExec
LocalDateTime localDateTime = (LocalDateTime)
record.getField(fieldIndex);
csvRecord.add((java.sql.Timestamp)
java.sql.Timestamp.valueOf(localDateTime));
break;
+ case TIMESTAMP_TZ:
+ OffsetDateTime offsetDateTime = (OffsetDateTime)
record.getField(fieldIndex);
+ if (offsetDateTime != null) {
+ String timestampTzStr =
offsetDateTime.toString().replace('T', ' ');
+ csvRecord.add(timestampTzStr);
+ } else {
+ csvRecord.add(null);
+ }
+ break;
case BYTES:
csvRecord.add(
org.apache.commons.codec.binary.Base64.encodeBase64String(
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcFieldTypeUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcFieldTypeUtils.java
index 8944824779..9287451171 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcFieldTypeUtils.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcFieldTypeUtils.java
@@ -22,6 +22,11 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeParseException;
public final class JdbcFieldTypeUtils {
@@ -95,12 +100,126 @@ public final class JdbcFieldTypeUtils {
return resultSet.getBytes(columnIndex);
}
+ public static OffsetDateTime getOffsetDateTime(ResultSet resultSet, int
columnIndex)
+ throws SQLException {
+ final Object obj = resultSet.getObject(columnIndex);
+ if (obj == null) {
+ return null;
+ }
+
+ // Handle OffsetDateTime directly
+ if (obj instanceof OffsetDateTime) {
+ return (OffsetDateTime) obj;
+ }
+
+ // Handle ZonedDateTime
+ if (obj instanceof ZonedDateTime) {
+ return ((ZonedDateTime) obj).toOffsetDateTime();
+ }
+
+ // Handle Instant
+ if (obj instanceof Instant) {
+ return ((Instant) obj).atOffset(ZoneOffset.UTC);
+ }
+
+ // Handle java.sql.Timestamp
+ if (obj instanceof Timestamp) {
+ return ((Timestamp) obj).toInstant().atOffset(ZoneOffset.UTC);
+ }
+
+ // Handle java.util.Date
+ if (obj instanceof java.util.Date) {
+ return ((java.util.Date) obj).toInstant().atOffset(ZoneOffset.UTC);
+ }
+
+ // Handle Long (epoch milliseconds)
+ if (obj instanceof Long) {
+ return Instant.ofEpochMilli((Long) obj).atOffset(ZoneOffset.UTC);
+ }
+
+ // Try to parse as string
+ String str = obj.toString();
+ try {
+ return parseOffsetDateTimeFromString(str);
+ } catch (Exception e) {
+ throw new SQLException(
+ "Failed to parse OffsetDateTime value: "
+ + str
+ + " (class: "
+ + obj.getClass().getName()
+ + ")",
+ e);
+ }
+ }
+
+ public static OffsetDateTime parseOffsetDateTimeFromString(String str)
+ throws DateTimeParseException {
+ String trimmed = str.trim();
+ // Treat empty string as "no value"
+ if (trimmed.isEmpty()) {
+ return null;
+ }
+ // Try parsing as standard ISO-8601 OffsetDateTime
+ OffsetDateTime directParsed = tryParseOffsetDateTime(trimmed);
+ if (directParsed != null) {
+ return directParsed;
+ }
+ // Normalize common relaxed forms and try again
+ String normalized = normalizeOffsetDateTimeString(trimmed);
+ OffsetDateTime normalizedParsed = tryParseOffsetDateTime(normalized);
+ if (normalizedParsed != null) {
+ return normalizedParsed;
+ }
+ // Finally, try parsing as ZonedDateTime and convert to OffsetDateTime
+ OffsetDateTime zonedParsed = tryParseZonedDateTime(trimmed);
+ if (zonedParsed != null) {
+ return zonedParsed;
+ }
+
+ throw new DateTimeParseException(
+ "Unable to parse OffsetDateTime from string: " + str, trimmed,
0);
+ }
+
+ private static OffsetDateTime tryParseOffsetDateTime(String value) {
+ try {
+ return OffsetDateTime.parse(value);
+ } catch (DateTimeParseException ignore) {
+ return null;
+ }
+ }
+
+ private static OffsetDateTime tryParseZonedDateTime(String value) {
+ try {
+ return ZonedDateTime.parse(value).toOffsetDateTime();
+ } catch (DateTimeParseException ignore) {
+ return null;
+ }
+ }
+
+ private static String normalizeOffsetDateTimeString(String value) {
+ String normalized = value;
+ if (normalized.endsWith(" UTC")) {
+ normalized = normalized.substring(0, normalized.length() - 4) +
"Z";
+ }
+ normalized = normalized.replace(' ', 'T');
+ if (normalized.matches(".*[+-]\\d{2}$")) {
+ normalized = normalized + ":00";
+ } else if (normalized.matches(".*[+-]\\d{4}$")) {
+ normalized =
+ normalized.substring(0, normalized.length() - 2)
+ + ":"
+ + normalized.substring(normalized.length() - 2);
+ }
+ return normalized;
+ }
+
private static <T> T getNullableValue(
ResultSet resultSet,
int columnIndex,
ThrowingFunction<ResultSet, T, SQLException> getter)
throws SQLException {
- if (resultSet.getObject(columnIndex) == null) {
+ final Object obj = resultSet.getObject(columnIndex);
+ if (obj == null) {
return null;
}
return getter.apply(resultSet, columnIndex);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeConverterTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeConverterTest.java
index 11c99b41a8..17176db809 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeConverterTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeConverterTest.java
@@ -374,7 +374,7 @@ public class KingbaseTypeConverterTest {
.build();
column = KingbaseTypeConverter.INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
- Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE,
column.getDataType());
+ Assertions.assertEquals(LocalTimeType.OFFSET_DATE_TIME_TYPE,
column.getDataType());
Assertions.assertEquals(typeDefine.getScale(), column.getScale());
Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType().toLowerCase());
@@ -387,7 +387,7 @@ public class KingbaseTypeConverterTest {
.build();
column = KingbaseTypeConverter.INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
- Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE,
column.getDataType());
+ Assertions.assertEquals(LocalTimeType.OFFSET_DATE_TIME_TYPE,
column.getDataType());
Assertions.assertEquals(typeDefine.getScale(), column.getScale());
Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType().toLowerCase());
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverterTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverterTest.java
new file mode 100644
index 0000000000..d18986cecf
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverterTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.postgresql.util.PGobject;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PostgresJdbcRowConverterTest {
+
+ private PostgresJdbcRowConverter converter;
+
+ @BeforeEach
+ public void setUp() {
+ converter = new PostgresJdbcRowConverter();
+ }
+
+ // Helper methods for test setup
+ private TableSchema createTableSchema(
+ String col2Name, Object col2DataType, String col2SourceType) {
+ List<Column> columns = new ArrayList<>();
+
columns.add(PhysicalColumn.builder().name("id").dataType(BasicType.INT_TYPE).build());
+ PhysicalColumn.PhysicalColumnBuilder builder =
+ PhysicalColumn.builder()
+ .name(col2Name)
+ .dataType((SeaTunnelDataType<?>) col2DataType);
+ if (col2SourceType != null) {
+ builder.sourceType(col2SourceType);
+ }
+ columns.add(builder.build());
+ return TableSchema.builder().columns(columns).build();
+ }
+
+ private void setupMockResultSet(
+ ResultSet rs, String col1Type, String col2Type, Object col1Value,
Object col2Value)
+ throws SQLException {
+ ResultSetMetaData metaData = mock(ResultSetMetaData.class);
+ when(rs.getMetaData()).thenReturn(metaData);
+ when(metaData.getColumnCount()).thenReturn(2);
+ when(metaData.getColumnTypeName(1)).thenReturn(col1Type);
+ when(metaData.getColumnTypeName(2)).thenReturn(col2Type);
+ // Handle multiple calls to getObject() - return same value each time
+ when(rs.getObject(1)).thenReturn(col1Value, col1Value);
+ when(rs.getObject(2)).thenReturn(col2Value, col2Value);
+ // Configure getInt() for INT type columns
+ if (col1Value instanceof Integer) {
+ when(rs.getInt(1)).thenReturn((Integer) col1Value);
+ }
+ }
+
+ private void assertOffsetDateTime(
+ OffsetDateTime offsetDateTime,
+ int year,
+ int month,
+ int day,
+ int hour,
+ int minute,
+ ZoneOffset offset) {
+ Assertions.assertEquals(year, offsetDateTime.getYear());
+ Assertions.assertEquals(month, offsetDateTime.getMonthValue());
+ Assertions.assertEquals(day, offsetDateTime.getDayOfMonth());
+ Assertions.assertEquals(hour, offsetDateTime.getHour());
+ Assertions.assertEquals(minute, offsetDateTime.getMinute());
+ Assertions.assertEquals(offset, offsetDateTime.getOffset());
+ }
+
+ @Test
+ public void testToInternalWithTimestampTzFromPGobject() throws
SQLException {
+ ResultSet rs = mock(ResultSet.class);
+ TableSchema tableSchema =
+ createTableSchema("timestamp_tz_col",
LocalTimeType.OFFSET_DATE_TIME_TYPE, null);
+
+ PGobject pgObject = new PGobject();
+ pgObject.setType("timestamptz");
+ pgObject.setValue("2023-05-07 14:30:00+08:00");
+
+ setupMockResultSet(rs, "INT4", "TIMESTAMPTZ", 1, pgObject);
+
+ SeaTunnelRow row = converter.toInternal(rs, tableSchema);
+
+ Assertions.assertNotNull(row);
+ Assertions.assertEquals(1, row.getField(0));
+
+ OffsetDateTime offsetDateTime = (OffsetDateTime) row.getField(1);
+ Assertions.assertNotNull(
+ offsetDateTime, "timestamp_tz_col should not be null when
reading from PGobject");
+ assertOffsetDateTime(offsetDateTime, 2023, 5, 7, 14, 30,
ZoneOffset.ofHours(8));
+ }
+
+ @Test
+ public void testToInternalWithTimestampTzFromString() throws SQLException {
+ ResultSet rs = mock(ResultSet.class);
+ TableSchema tableSchema =
+ createTableSchema("timestamp_tz_col",
LocalTimeType.OFFSET_DATE_TIME_TYPE, null);
+
+ setupMockResultSet(rs, "INT4", "TIMESTAMPTZ", 1, "2023-05-07
14:30:00+08:00");
+
+ SeaTunnelRow row = converter.toInternal(rs, tableSchema);
+
+ Assertions.assertNotNull(row);
+ Assertions.assertEquals(1, row.getField(0));
+
+ OffsetDateTime offsetDateTime = (OffsetDateTime) row.getField(1);
+ Assertions.assertNotNull(
+ offsetDateTime, "timestamp_tz_col should not be null when
reading from string");
+ assertOffsetDateTime(offsetDateTime, 2023, 5, 7, 14, 30,
ZoneOffset.ofHours(8));
+ }
+
+ @Test
+ public void testToInternalWithNullTimestampTz() throws SQLException {
+ ResultSet rs = mock(ResultSet.class);
+ TableSchema tableSchema =
+ createTableSchema("timestamp_tz_col",
LocalTimeType.OFFSET_DATE_TIME_TYPE, null);
+
+ setupMockResultSet(rs, "INT4", "TIMESTAMPTZ", 1, null);
+
+ SeaTunnelRow row = converter.toInternal(rs, tableSchema);
+
+ Assertions.assertNotNull(row);
+ Assertions.assertEquals(1, row.getField(0));
+ Assertions.assertNull(row.getField(1), "timestamp_tz_col should be
null");
+ }
+
+ @Test
+ public void testToInternalWithGeometryType() throws SQLException {
+ ResultSet rs = mock(ResultSet.class);
+ TableSchema tableSchema =
+ createTableSchema("geometry_col", BasicType.STRING_TYPE,
"GEOMETRY");
+
+ setupMockResultSet(rs, "INT4", "GEOMETRY", 1, "POINT(1 2)");
+
+ SeaTunnelRow row = converter.toInternal(rs, tableSchema);
+
+ Assertions.assertNotNull(row);
+ Assertions.assertEquals(1, row.getField(0));
+ Assertions.assertEquals("POINT(1 2)", row.getField(1));
+ }
+
+ @Test
+ public void testToInternalWithNullGeometryType() throws SQLException {
+ ResultSet rs = mock(ResultSet.class);
+ TableSchema tableSchema =
+ createTableSchema("geometry_col", BasicType.STRING_TYPE,
"GEOMETRY");
+
+ setupMockResultSet(rs, "INT4", "GEOMETRY", 1, null);
+
+ SeaTunnelRow row = converter.toInternal(rs, tableSchema);
+
+ Assertions.assertNotNull(row);
+ Assertions.assertEquals(1, row.getField(0));
+ Assertions.assertNull(row.getField(1), "geometry_col should be null");
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
index ce48ddab64..f8058a7c00 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
@@ -374,7 +374,7 @@ public class PostgresTypeConverterTest {
.build();
column = PostgresTypeConverter.INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
- Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE,
column.getDataType());
+ Assertions.assertEquals(LocalTimeType.OFFSET_DATE_TIME_TYPE,
column.getDataType());
Assertions.assertEquals(typeDefine.getScale(), column.getScale());
Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType().toLowerCase());
@@ -387,7 +387,7 @@ public class PostgresTypeConverterTest {
.build();
column = PostgresTypeConverter.INSTANCE.convert(typeDefine);
Assertions.assertEquals(typeDefine.getName(), column.getName());
- Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE,
column.getDataType());
+ Assertions.assertEquals(LocalTimeType.OFFSET_DATE_TIME_TYPE,
column.getDataType());
Assertions.assertEquals(typeDefine.getScale(), column.getScale());
Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType().toLowerCase());
}
@@ -744,6 +744,76 @@ public class PostgresTypeConverterTest {
typeDefine.getColumnType());
}
+ @Test
+ public void testConvertTimestampTz() {
+ BasicTypeDefine<Object> typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("timestamptz")
+ .dataType("timestamptz")
+ .build();
+ Column column = PostgresTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(LocalTimeType.OFFSET_DATE_TIME_TYPE,
column.getDataType());
+ Assertions.assertEquals(typeDefine.getScale(), column.getScale());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType().toLowerCase());
+
+ typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("timestamptz(6)")
+ .dataType("timestamptz")
+ .scale(6)
+ .build();
+ column = PostgresTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(LocalTimeType.OFFSET_DATE_TIME_TYPE,
column.getDataType());
+ Assertions.assertEquals(typeDefine.getScale(), column.getScale());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType().toLowerCase());
+ }
+
+ @Test
+ public void testReconvertTimestampTz() {
+ Column column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(LocalTimeType.OFFSET_DATE_TIME_TYPE)
+ .build();
+
+ BasicTypeDefine<Object> typeDefine =
PostgresTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(PostgresTypeConverter.PG_TIMESTAMP_TZ,
typeDefine.getColumnType());
+ Assertions.assertEquals(PostgresTypeConverter.PG_TIMESTAMP_TZ,
typeDefine.getDataType());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(LocalTimeType.OFFSET_DATE_TIME_TYPE)
+ .scale(3)
+ .build();
+
+ typeDefine = PostgresTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(
+ String.format("%s(%s)", PostgresTypeConverter.PG_TIMESTAMP_TZ,
column.getScale()),
+ typeDefine.getColumnType());
+ Assertions.assertEquals(PostgresTypeConverter.PG_TIMESTAMP_TZ,
typeDefine.getDataType());
+ Assertions.assertEquals(column.getScale(), typeDefine.getScale());
+
+ column =
+ PhysicalColumn.builder()
+ .name("test")
+ .dataType(LocalTimeType.OFFSET_DATE_TIME_TYPE)
+ .scale(9)
+ .build();
+
+ typeDefine = PostgresTypeConverter.INSTANCE.reconvert(column);
+ Assertions.assertEquals(column.getName(), typeDefine.getName());
+ Assertions.assertEquals(
+ String.format("%s(%s)", PostgresTypeConverter.PG_TIMESTAMP_TZ,
6),
+ typeDefine.getColumnType());
+ }
+
@Test
public void testReconvertArray() {
Column column =
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcFieldTypeUtilsTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcFieldTypeUtilsTest.java
new file mode 100644
index 0000000000..8135b4b618
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcFieldTypeUtilsTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Date;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class JdbcFieldTypeUtilsTest {
+
+ @Test
+ public void testGetOffsetDateTimeFromTimestampUsesInstant() throws
SQLException {
+ Instant instant = Instant.parse("2025-01-01T00:00:00Z");
+ Timestamp timestamp = Timestamp.from(instant);
+
+ ResultSet rs = mock(ResultSet.class);
+ when(rs.getObject(1)).thenReturn(timestamp);
+ OffsetDateTime result = JdbcFieldTypeUtils.getOffsetDateTime(rs, 1);
+
+ assertEquals(instant, result.toInstant());
+ assertEquals(ZoneOffset.UTC, result.getOffset());
+ }
+
+ @Test
+ public void testGetOffsetDateTimeFromDate() throws SQLException {
+ Instant instant = Instant.parse("2025-02-02T12:34:56Z");
+ Date date = Date.from(instant);
+
+ ResultSet rs = mock(ResultSet.class);
+ when(rs.getObject(1)).thenReturn(date);
+ OffsetDateTime result = JdbcFieldTypeUtils.getOffsetDateTime(rs, 1);
+
+ assertEquals(instant, result.toInstant());
+ assertEquals(ZoneOffset.UTC, result.getOffset());
+ }
+
+ @Test
+ public void testGetOffsetDateTimeFromEpochMilli() throws SQLException {
+ Instant instant = Instant.parse("2025-03-03T08:00:00Z");
+ long epochMilli = instant.toEpochMilli();
+
+ ResultSet rs = mock(ResultSet.class);
+ when(rs.getObject(1)).thenReturn(epochMilli);
+ OffsetDateTime result = JdbcFieldTypeUtils.getOffsetDateTime(rs, 1);
+
+ assertEquals(instant, result.toInstant());
+ assertEquals(ZoneOffset.UTC, result.getOffset());
+ }
+
+ @Test
+ public void testGetOffsetDateTimeFromIsoString() throws SQLException {
+ Instant instant = Instant.parse("2025-04-04T16:20:30Z");
+ String value = "2025-04-04T16:20:30Z";
+
+ ResultSet rs = mock(ResultSet.class);
+ when(rs.getObject(1)).thenReturn(value);
+ OffsetDateTime result = JdbcFieldTypeUtils.getOffsetDateTime(rs, 1);
+
+ assertEquals(instant, result.toInstant());
+ assertEquals(ZoneOffset.UTC, result.getOffset());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
index 16767e51ba..c0ae2372b5 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
@@ -99,6 +99,7 @@ public class JdbcPostgresIT extends TestSuiteBase implements
TestResource {
+ " bigserial_col BIGSERIAL,\n"
+ " date_col DATE,\n"
+ " timestamp_col TIMESTAMP,\n"
+ + " timestamp_tz_col TIMESTAMP WITH TIME ZONE,\n"
+ " bpchar_col BPCHAR(10),\n"
+ " age INT NOT null,\n"
+ " name VARCHAR(255) NOT null,\n"
@@ -135,6 +136,7 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
+ " bigserial_col BIGSERIAL,\n"
+ " date_col DATE,\n"
+ " timestamp_col TIMESTAMP,\n"
+ + " timestamp_tz_col TIMESTAMP WITH TIME ZONE,\n"
+ " bpchar_col BPCHAR(10),\n"
+ " age int4 NOT NULL,\n"
+ " name varchar(255) NOT NULL,\n"
@@ -146,7 +148,7 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
+ " multipolygon varchar(2000) NULL,\n"
+ " geometrycollection varchar(2000) NULL,\n"
+ " geog varchar(2000) NULL,\n"
- + " json_col json NOT NULL \n,"
+ + " json_col json NOT NULL,\n"
+ " jsonb_col jsonb NOT NULL,\n"
+ " xml_col xml NOT NULL\n"
+ " )";
@@ -171,6 +173,7 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
+ "bigserial_col,\n"
+ "date_col,\n"
+ "timestamp_col,\n"
+ + "timestamp_tz_col,\n"
+ "bpchar_col,\n"
+ "age,\n"
+ "name,\n"
@@ -207,7 +210,8 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
+ " bigserial_col,\n"
+ " date_col,\n"
+ " timestamp_col,\n"
- + " bpchar_col,"
+ + " timestamp_tz_col,\n"
+ + " bpchar_col,\n"
+ " age,\n"
+ " name,\n"
+ " cast(point as geometry) as point,\n"
@@ -215,7 +219,7 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
+ " cast(polygon_colums as geometry) as polygon_colums,\n"
+ " cast(multipoint as geometry) as multipoint,\n"
+ " cast(multilinestring as geometry) as
multilinestring,\n"
- + " cast(multipolygon as geometry) as multilinestring,\n"
+ + " cast(multipolygon as geometry) as multipolygon,\n"
+ " cast(geometrycollection as geometry) as
geometrycollection,\n"
+ " cast(geog as geography) as geog,\n"
+ " json_col,\n"
@@ -338,7 +342,14 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
+ " job run failed in "
+ container.getClass().getSimpleName()
+ ".");
- Assertions.assertIterableEquals(querySql(SOURCE_SQL),
querySql(SINK_SQL));
+ java.util.List<java.util.List<Object>> src =
querySql(SOURCE_SQL);
+ java.util.List<java.util.List<Object>> dst =
querySql(SINK_SQL);
+ if (!src.isEmpty() && !dst.isEmpty()) {
+ Object srcTz = src.get(0).size() > 19 ? src.get(0).get(19)
: null;
+ Object dstTz = dst.get(0).size() > 19 ? dst.get(0).get(19)
: null;
+ log.info("First row tz src={}, dst={}", srcTz, dstTz);
+ }
+ Assertions.assertIterableEquals(src, dst);
} finally {
executeSQL("truncate table pg_e2e_sink_table");
}
@@ -411,6 +422,7 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
+ " bigserial_col,\n"
+ " date_col,\n"
+ " timestamp_col,\n"
+ + " timestamp_tz_col,\n"
+ " bpchar_col,\n"
+ " age,\n"
+ " name,\n"
@@ -449,6 +461,7 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
+ " 10000,\n"
+ " '2023-05-07',\n"
+ " '2023-05-07 14:30:00',\n"
+ + " '2023-05-07 14:30:00+08:00',\n"
+ " 'Testing',\n"
+ " 21,\n"
+ " 'Leblanc',\n"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf
index 1915cfa3de..5d6785e647 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf
@@ -27,7 +27,7 @@ source{
username = "test"
password = "test"
query ="""select gid, uuid_col, text_col, varchar_col, char_one_col,
char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col,
numeric_col, real_col, double_precision_col,
- smallserial_col, serial_col, bigserial_col, date_col,
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums,
multipoint,
+ smallserial_col, serial_col, bigserial_col, date_col,
timestamp_col, timestamp_tz_col, bpchar_col, age, name, point, linestring,
polygon_colums, multipoint,
multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col,xml_col from pg_e2e_source_table"""
partition_column = "varchar_col"
partition_num = 2
@@ -46,4 +46,4 @@ sink {
table = public.pg_e2e_sink_table
primary_keys = ["gid"]
}
-}
\ No newline at end of file
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_stmt.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_stmt.conf
index 0bd44362ef..b8fa1b403d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_stmt.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_stmt.conf
@@ -27,7 +27,7 @@ source{
username = "test"
password = "test"
query ="""select gid, uuid_col, text_col, varchar_col, char_one_col,
char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col,
numeric_col, real_col, double_precision_col,
- smallserial_col, serial_col, bigserial_col, date_col,
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums,
multipoint,
+ smallserial_col, serial_col, bigserial_col, date_col,
timestamp_col, timestamp_tz_col, bpchar_col, age, name, point, linestring,
polygon_colums, multipoint,
multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col,xml_col from pg_e2e_source_table"""
partition_column = "varchar_col"
partition_num = 2
@@ -47,4 +47,4 @@ sink {
use_copy_statement = true
primary_keys = ["gid"]
}
-}
\ No newline at end of file
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf
index 2d9cf1cb3b..f7775089ca 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf
@@ -27,7 +27,7 @@ source{
username = "test"
password = "test"
query ="""select gid, uuid_col, text_col, varchar_col, char_one_col,
char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col,
numeric_col, real_col, double_precision_col,
- smallserial_col, serial_col, bigserial_col, date_col,
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums,
multipoint,
+ smallserial_col, serial_col, bigserial_col, date_col,
timestamp_col, timestamp_tz_col, bpchar_col, age, name, point, linestring,
polygon_colums, multipoint,
multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col,xml_col from pg_e2e_source_table"""
partition_column= "gid"
@@ -46,8 +46,10 @@ sink {
password = "test"
connection_check_timeout_sec = 100
query ="""INSERT INTO pg_e2e_sink_table ( gid, uuid_col, text_col,
varchar_col, char_one_col, char_col, boolean_col, smallint_col, integer_col,
bigint_col, decimal_col, numeric_col, real_col,
- double_precision_col,
smallserial_col, serial_col, bigserial_col, date_col, timestamp_col,
bpchar_col, age, name, point,
+ double_precision_col,
smallserial_col, serial_col, bigserial_col, date_col, timestamp_col,
timestamp_tz_col, bpchar_col, age, name, point,
linestring,
polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col,xml_col)
- VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,? )"""
+ VALUES(:gid, :uuid_col, :text_col,
:varchar_col, :char_one_col, :char_col, :boolean_col, :smallint_col,
:integer_col, :bigint_col, :decimal_col, :numeric_col, :real_col,
+ :double_precision_col,
:smallserial_col, :serial_col, :bigserial_col, :date_col, :timestamp_col,
:timestamp_tz_col, :bpchar_col, :age, :name, :point,
+ :linestring, :polygon_colums,
:multipoint, :multilinestring, :multipolygon, :geometrycollection, :geog,
:json_col, :jsonb_col, :xml_col)"""
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
index 22d56fa568..c8c53e08bc 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
@@ -27,7 +27,7 @@ source{
user = "test"
password = "test"
query ="""select gid, uuid_col, text_col, varchar_col, char_one_col,
char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col,
numeric_col, real_col, double_precision_col,
- smallserial_col, serial_col, bigserial_col, date_col,
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums,
multipoint,
+ smallserial_col, serial_col, bigserial_col, date_col,
timestamp_col, timestamp_tz_col, bpchar_col, age, name, point, linestring,
polygon_colums, multipoint,
multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col,xml_col from pg_e2e_source_table"""
partition_column= "gid"
@@ -50,8 +50,10 @@ sink {
password = "test"
connection_check_timeout_sec = 100
query ="""INSERT INTO pg_e2e_sink_table ( gid, uuid_col, text_col,
varchar_col, char_one_col, char_col, boolean_col, smallint_col, integer_col,
bigint_col, decimal_col, numeric_col, real_col,
- double_precision_col,
smallserial_col, serial_col, bigserial_col, date_col, timestamp_col,
bpchar_col, age, name, point,
+ double_precision_col,
smallserial_col, serial_col, bigserial_col, date_col, timestamp_col,
timestamp_tz_col, bpchar_col, age, name, point,
linestring,
polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col,xml_col )
- VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ,?)"""
+ VALUES(:gid, :uuid_col, :text_col,
:varchar_col, :char_one_col, :char_col, :boolean_col, :smallint_col,
:integer_col, :bigint_col, :decimal_col, :numeric_col, :real_col,
+ :double_precision_col,
:smallserial_col, :serial_col, :bigserial_col, :date_col, :timestamp_col,
:timestamp_tz_col, :bpchar_col, :age, :name, :point,
+ :linestring, :polygon_colums,
:multipoint, :multilinestring, :multipolygon, :geometrycollection, :geog,
:json_col, :jsonb_col, :xml_col)"""
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
index fc72a29f5f..5f484873e8 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
@@ -27,7 +27,7 @@ source {
username = "test"
password = "test"
query ="""select gid, uuid_col, text_col, varchar_col, char_one_col,
char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col,
numeric_col, real_col, double_precision_col,
- smallserial_col, serial_col, bigserial_col, date_col,
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums,
multipoint,
+ smallserial_col, serial_col, bigserial_col, date_col,
timestamp_col, timestamp_tz_col, bpchar_col, age, name, point, linestring,
polygon_colums, multipoint,
multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col ,xml_col from pg_e2e_source_table"""
}
}
@@ -43,9 +43,11 @@ sink {
password = "test"
max_retries = 0
query ="""INSERT INTO pg_e2e_sink_table ( gid, uuid_col, text_col,
varchar_col, char_one_col, char_col, boolean_col, smallint_col, integer_col,
bigint_col, decimal_col, numeric_col, real_col,
- double_precision_col,
smallserial_col, serial_col, bigserial_col, date_col, timestamp_col,
bpchar_col, age, name, point,
+ double_precision_col,
smallserial_col, serial_col, bigserial_col, date_col, timestamp_col,
timestamp_tz_col, bpchar_col, age, name, point,
linestring,
polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col ,xml_col)
- VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)"""
+ VALUES(:gid, :uuid_col, :text_col,
:varchar_col, :char_one_col, :char_col, :boolean_col, :smallint_col,
:integer_col, :bigint_col, :decimal_col, :numeric_col, :real_col,
+ :double_precision_col,
:smallserial_col, :serial_col, :bigserial_col, :date_col, :timestamp_col,
:timestamp_tz_col, :bpchar_col, :age, :name, :point,
+ :linestring, :polygon_colums,
:multipoint, :multilinestring, :multipolygon, :geometrycollection, :geog,
:json_col, :jsonb_col, :xml_col)"""
is_exactly_once = "true"