This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 30b0a08495 [Feature][Connector-V2] Postgresql  support read 
TIMESTAMP_TZ (#10048)
30b0a08495 is described below

commit 30b0a0849555e2ac1bb489a19348a93867e99e7e
Author: yzeng1618 <[email protected]>
AuthorDate: Sat Nov 29 11:09:33 2025 +0800

    [Feature][Connector-V2] Postgresql  support read TIMESTAMP_TZ (#10048)
    
    Co-authored-by: zengyi <[email protected]>
---
 .../apache/seatunnel/api/table/type/BasicType.java |   3 +
 .../converter/AbstractJdbcRowConverter.java        |  14 +-
 .../dialect/psql/PostgresJdbcRowConverter.java     | 151 ++++++++++++++++-
 .../dialect/psql/PostgresTypeConverter.java        |  23 ++-
 .../CopyManagerBatchStatementExecutor.java         |  10 ++
 .../seatunnel/jdbc/utils/JdbcFieldTypeUtils.java   | 121 ++++++++++++-
 .../kingbase/KingbaseTypeConverterTest.java        |   4 +-
 .../dialect/psql/PostgresJdbcRowConverterTest.java | 188 +++++++++++++++++++++
 .../dialect/psql/PostgresTypeConverterTest.java    |  74 +++++++-
 .../jdbc/utils/JdbcFieldTypeUtilsTest.java         |  87 ++++++++++
 .../connectors/seatunnel/jdbc/JdbcPostgresIT.java  |  21 ++-
 .../resources/jdbc_postgres_source_and_sink.conf   |   4 +-
 .../jdbc_postgres_source_and_sink_copy_stmt.conf   |   4 +-
 .../jdbc_postgres_source_and_sink_parallel.conf    |   8 +-
 ...tgres_source_and_sink_parallel_upper_lower.conf |   8 +-
 .../jdbc_postgres_source_and_sink_xa.conf          |   8 +-
 16 files changed, 697 insertions(+), 31 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
index 67f24ef412..1c05672947 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/BasicType.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.api.table.type;
 
+import java.time.OffsetDateTime;
 import java.util.Objects;
 
 public class BasicType<T> implements SeaTunnelDataType<T> {
@@ -35,6 +36,8 @@ public class BasicType<T> implements SeaTunnelDataType<T> {
     public static final BasicType<Double> DOUBLE_TYPE =
             new BasicType<>(Double.class, SqlType.DOUBLE);
     public static final BasicType<Void> VOID_TYPE = new 
BasicType<>(Void.class, SqlType.NULL);
+    public static final LocalTimeType<OffsetDateTime> OFFSET_DATE_TIME_TYPE =
+            LocalTimeType.OFFSET_DATE_TIME_TYPE;
 
     // 
--------------------------------------------------------------------------------------------
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
index d5c3aa95ac..c97eeeec04 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/converter/AbstractJdbcRowConverter.java
@@ -120,6 +120,11 @@ public abstract class AbstractJdbcRowConverter implements 
JdbcRowConverter {
                                     .map(e -> e.toLocalDateTime())
                                     .orElse(null);
                     break;
+                case TIMESTAMP_TZ:
+                    OffsetDateTime offsetDateTime =
+                            JdbcFieldTypeUtils.getOffsetDateTime(rs, 
resultSetIndex);
+                    fields[fieldIndex] = offsetDateTime;
+                    break;
                 case BYTES:
                     fields[fieldIndex] = JdbcFieldTypeUtils.getBytes(rs, 
resultSetIndex);
                     break;
@@ -278,7 +283,14 @@ public abstract class AbstractJdbcRowConverter implements 
JdbcRowConverter {
                 break;
             case TIMESTAMP_TZ:
                 OffsetDateTime offsetDateTime = (OffsetDateTime) value;
-                statement.setTimestamp(statementIndex, 
Timestamp.from(offsetDateTime.toInstant()));
+                try {
+                    // Try to use setObject first for better timezone support
+                    statement.setObject(statementIndex, offsetDateTime);
+                } catch (SQLException e) {
+                    // Fallback to setTimestamp if setObject is not supported
+                    statement.setTimestamp(
+                            statementIndex, 
Timestamp.from(offsetDateTime.toInstant()));
+                }
                 break;
             case BYTES:
                 statement.setBytes(statementIndex, (byte[]) value);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
index aef78499ce..fd5f5d0d9c 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql;
 
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
 import org.apache.seatunnel.shade.org.apache.commons.lang3.math.NumberUtils;
 
 import org.apache.seatunnel.api.table.catalog.Column;
@@ -35,6 +36,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
 
 import org.postgresql.util.PGobject;
 
+import lombok.extern.slf4j.Slf4j;
+
 import javax.annotation.Nullable;
 
 import java.math.BigDecimal;
@@ -50,6 +53,7 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
 import java.util.Locale;
 import java.util.Optional;
 
@@ -59,6 +63,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.ps
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_MAC_ADDR;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_MAC_ADDR8;
 
+@Slf4j
 public class PostgresJdbcRowConverter extends AbstractJdbcRowConverter {
 
     private static final String PG_GEOMETRY = "GEOMETRY";
@@ -69,6 +74,29 @@ public class PostgresJdbcRowConverter extends 
AbstractJdbcRowConverter {
         return DatabaseIdentifier.POSTGRESQL;
     }
 
+    @Override
+    protected void setValueToStatementByDataType(
+            Object value,
+            PreparedStatement statement,
+            SeaTunnelDataType<?> seaTunnelDataType,
+            int statementIndex,
+            @Nullable String sourceType)
+            throws SQLException {
+        if (seaTunnelDataType.getSqlType().equals(SqlType.TIMESTAMP_TZ)) {
+            if (value == null) {
+                statement.setNull(statementIndex, 
java.sql.Types.TIMESTAMP_WITH_TIMEZONE);
+            } else {
+                PGobject timestampTzObject = new PGobject();
+                timestampTzObject.setType("timestamptz");
+                timestampTzObject.setValue(((OffsetDateTime) 
value).toString());
+                statement.setObject(statementIndex, timestampTzObject);
+            }
+            return;
+        }
+        super.setValueToStatementByDataType(
+                value, statement, seaTunnelDataType, statementIndex, 
sourceType);
+    }
+
     @Override
     public SeaTunnelRow toInternal(ResultSet rs, TableSchema tableSchema) 
throws SQLException {
         SeaTunnelRowType typeInfo = tableSchema.toPhysicalRowDataType();
@@ -82,10 +110,8 @@ public class PostgresJdbcRowConverter extends 
AbstractJdbcRowConverter {
                 case STRING:
                     if (metaDataColumnType.equals(PG_GEOMETRY)
                             || metaDataColumnType.equals(PG_GEOGRAPHY)) {
-                        fields[fieldIndex] =
-                                rs.getObject(resultSetIndex) == null
-                                        ? null
-                                        : 
rs.getObject(resultSetIndex).toString();
+                        Object geoObj = rs.getObject(resultSetIndex);
+                        fields[fieldIndex] = geoObj == null ? null : 
geoObj.toString();
                     } else {
                         fields[fieldIndex] = JdbcFieldTypeUtils.getString(rs, 
resultSetIndex);
                     }
@@ -131,6 +157,10 @@ public class PostgresJdbcRowConverter extends 
AbstractJdbcRowConverter {
                                     .map(e -> e.toLocalDateTime())
                                     .orElse(null);
                     break;
+                case TIMESTAMP_TZ:
+                    // Enhanced PostgreSQL TIMESTAMP_TZ handling
+                    fields[fieldIndex] = getPostgresOffsetDateTime(rs, 
resultSetIndex);
+                    break;
                 case BYTES:
                     fields[fieldIndex] = JdbcFieldTypeUtils.getBytes(rs, 
resultSetIndex);
                     break;
@@ -255,9 +285,12 @@ public class PostgresJdbcRowConverter extends 
AbstractJdbcRowConverter {
                                 statementIndex, 
java.sql.Timestamp.valueOf(localDateTime));
                         break;
                     case TIMESTAMP_TZ:
-                        OffsetDateTime offsetDateTime = (OffsetDateTime) 
row.getField(fieldIndex);
-                        statement.setTimestamp(
-                                statementIndex, 
Timestamp.from(offsetDateTime.toInstant()));
+                        setValueToStatementByDataType(
+                                row.getField(fieldIndex),
+                                statement,
+                                seaTunnelDataType,
+                                statementIndex,
+                                sourceTypes[fieldIndex]);
                         break;
                     case BYTES:
                         statement.setBytes(statementIndex, (byte[]) 
row.getField(fieldIndex));
@@ -316,4 +349,108 @@ public class PostgresJdbcRowConverter extends 
AbstractJdbcRowConverter {
         if (seconds > 0) sb.append(seconds).append(" seconds");
         return sb.toString().trim();
     }
+
+    private OffsetDateTime getPostgresOffsetDateTime(ResultSet rs, int 
columnIndex)
+            throws SQLException {
+        // Read the value once to avoid drivers returning null on subsequent 
reads
+        final Object obj = rs.getObject(columnIndex);
+
+        if (obj == null) {
+            return null;
+        }
+
+        // Direct types
+        if (obj instanceof OffsetDateTime) {
+            return (OffsetDateTime) obj;
+        }
+        if (obj instanceof Timestamp) {
+            return ((Timestamp) obj).toInstant().atOffset(ZoneOffset.UTC);
+        }
+        if (obj instanceof java.time.ZonedDateTime) {
+            return ((java.time.ZonedDateTime) obj).toOffsetDateTime();
+        }
+        if (obj instanceof java.util.Date) {
+            return ((java.util.Date) obj).toInstant().atOffset(ZoneOffset.UTC);
+        }
+
+        // Remaining PostgreSQL-specific or driver types: fall back to string 
representation
+        return parseTimestampFromObjectString(obj);
+    }
+
+    private OffsetDateTime parsePostgresTimestampTz(String str) throws 
SQLException {
+        String normalized = normalizeIsoTimestamp(str);
+        if (normalized == null) {
+            return null;
+        }
+
+        try {
+            return OffsetDateTime.parse(normalized);
+        } catch (Exception primary) {
+            log.debug("Failed to parse PostgreSQL timestamptz as ISO-8601: 
{}", str, primary);
+            try {
+                String withoutOffset =
+                        
normalized.replaceFirst("([+-]\\d{2}:?\\d{2}|\\s+UTC|[zZ])$", "");
+                String fallback = withoutOffset.replace('T', ' ').trim();
+                Timestamp ts = Timestamp.valueOf(fallback);
+                return ts.toInstant().atOffset(ZoneOffset.UTC);
+            } catch (Exception secondary) {
+                log.debug(
+                        "Failed to parse PostgreSQL timestamptz as UTC 
timestamp: {}",
+                        str,
+                        secondary);
+                throw new SQLException(
+                        "Failed to parse PostgreSQL timestamptz string: " + 
str, secondary);
+            }
+        }
+    }
+
+    @Nullable private OffsetDateTime parseTimestampFromObjectString(Object 
obj) throws SQLException {
+        final String str;
+        try {
+            str = String.valueOf(obj);
+        } catch (Throwable e) {
+            log.debug(
+                    "Failed to get PostgreSQL timestamp object string 
representation from class: {}",
+                    obj.getClass().getName(),
+                    e);
+            return null;
+        }
+        return parsePostgresTimestampTz(str);
+    }
+
+    private String normalizeIsoTimestamp(String value) {
+        // PostgreSQL timestamptz format examples:
+        // "2023-12-25 10:30:45.123456+08:00"
+        // "2023-12-25 10:30:45+08"
+        // "2023-12-25 10:30:45.123456 UTC"
+        String normalized = StringUtils.trimToNull(value);
+        if (normalized == null) {
+            return null;
+        }
+        // Handle UTC timezone
+        if (normalized.endsWith(" UTC")) {
+            normalized = normalized.substring(0, normalized.length() - 4) + 
"Z";
+        }
+        // Normalize to ISO-8601 format examples:
+        // "2024-01-01T10:15:30+08:00"
+        // "2024-01-01T10:15:30Z"
+        normalized = normalized.replace(' ', 'T');
+        if (!normalized.isEmpty()) {
+            char lastChar = normalized.charAt(normalized.length() - 1);
+            if (lastChar == 'z' || lastChar == 'Z') {
+                normalized = normalized.substring(0, normalized.length() - 1) 
+ "Z";
+            }
+        }
+        // Add colon to offsets like +HH -> +HH:00
+        if (normalized.matches(".*[+-]\\d{2}$")) {
+            return normalized + ":00";
+        }
+        if (normalized.matches(".*[+-]\\d{4}$")) {
+            // Add colon to offsets like +HHMM -> +HH:MM
+            return normalized.substring(0, normalized.length() - 2)
+                    + ":"
+                    + normalized.substring(normalized.length() - 2);
+        }
+        return normalized;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
index f472d3bce5..418b894b21 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
@@ -262,7 +262,6 @@ public class PostgresTypeConverter implements 
TypeConverter<BasicTypeDefine> {
                 }
                 break;
             case PG_TIMESTAMP:
-            case PG_TIMESTAMP_TZ:
                 builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
                 if (typeDefine.getScale() != null && typeDefine.getScale() > 
MAX_TIMESTAMP_SCALE) {
                     builder.scale(MAX_TIMESTAMP_SCALE);
@@ -274,6 +273,15 @@ public class PostgresTypeConverter implements 
TypeConverter<BasicTypeDefine> {
                     builder.scale(typeDefine.getScale());
                 }
                 break;
+            case PG_TIMESTAMP_TZ:
+                // timestamptz -> TIMESTAMP_TZ
+                builder.dataType(LocalTimeType.OFFSET_DATE_TIME_TYPE);
+                if (typeDefine.getScale() != null && typeDefine.getScale() > 
MAX_TIMESTAMP_SCALE) {
+                    builder.scale(MAX_TIMESTAMP_SCALE);
+                } else {
+                    builder.scale(typeDefine.getScale());
+                }
+                break;
             default:
                 throw CommonError.convertToSeaTunnelTypeError(
                         identifier(), typeDefine.getDataType(), 
typeDefine.getName());
@@ -444,6 +452,19 @@ public class PostgresTypeConverter implements 
TypeConverter<BasicTypeDefine> {
                 builder.dataType(PG_TIMESTAMP);
                 builder.scale(timestampScale);
                 break;
+            case TIMESTAMP_TZ:
+                Integer timestampTzScale = column.getScale();
+                if (timestampTzScale != null && timestampTzScale > 
MAX_TIMESTAMP_SCALE) {
+                    timestampTzScale = MAX_TIMESTAMP_SCALE;
+                }
+                String timestampTzColumnType =
+                        (timestampTzScale != null && timestampTzScale > 0)
+                                ? String.format("%s(%s)", PG_TIMESTAMP_TZ, 
timestampTzScale)
+                                : PG_TIMESTAMP_TZ;
+                builder.columnType(timestampTzColumnType);
+                builder.dataType(PG_TIMESTAMP_TZ);
+                builder.scale(timestampTzScale);
+                break;
             case ARRAY:
                 ArrayType arrayType = (ArrayType) column.getDataType();
                 SeaTunnelDataType elementType = arrayType.getElementType();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerBatchStatementExecutor.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerBatchStatementExecutor.java
index b485d39de1..45f5b0996b 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerBatchStatementExecutor.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/executor/CopyManagerBatchStatementExecutor.java
@@ -37,6 +37,7 @@ import java.sql.SQLException;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
+import java.time.OffsetDateTime;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -145,6 +146,15 @@ public class CopyManagerBatchStatementExecutor implements 
JdbcBatchStatementExec
                     LocalDateTime localDateTime = (LocalDateTime) 
record.getField(fieldIndex);
                     csvRecord.add((java.sql.Timestamp) 
java.sql.Timestamp.valueOf(localDateTime));
                     break;
+                case TIMESTAMP_TZ:
+                    OffsetDateTime offsetDateTime = (OffsetDateTime) 
record.getField(fieldIndex);
+                    if (offsetDateTime != null) {
+                        String timestampTzStr = 
offsetDateTime.toString().replace('T', ' ');
+                        csvRecord.add(timestampTzStr);
+                    } else {
+                        csvRecord.add(null);
+                    }
+                    break;
                 case BYTES:
                     csvRecord.add(
                             
org.apache.commons.codec.binary.Base64.encodeBase64String(
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcFieldTypeUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcFieldTypeUtils.java
index 8944824779..9287451171 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcFieldTypeUtils.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcFieldTypeUtils.java
@@ -22,6 +22,11 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeParseException;
 
 public final class JdbcFieldTypeUtils {
 
@@ -95,12 +100,126 @@ public final class JdbcFieldTypeUtils {
         return resultSet.getBytes(columnIndex);
     }
 
+    public static OffsetDateTime getOffsetDateTime(ResultSet resultSet, int 
columnIndex)
+            throws SQLException {
+        final Object obj = resultSet.getObject(columnIndex);
+        if (obj == null) {
+            return null;
+        }
+
+        // Handle OffsetDateTime directly
+        if (obj instanceof OffsetDateTime) {
+            return (OffsetDateTime) obj;
+        }
+
+        // Handle ZonedDateTime
+        if (obj instanceof ZonedDateTime) {
+            return ((ZonedDateTime) obj).toOffsetDateTime();
+        }
+
+        // Handle Instant
+        if (obj instanceof Instant) {
+            return ((Instant) obj).atOffset(ZoneOffset.UTC);
+        }
+
+        // Handle java.sql.Timestamp
+        if (obj instanceof Timestamp) {
+            return ((Timestamp) obj).toInstant().atOffset(ZoneOffset.UTC);
+        }
+
+        // Handle java.util.Date
+        if (obj instanceof java.util.Date) {
+            return ((java.util.Date) obj).toInstant().atOffset(ZoneOffset.UTC);
+        }
+
+        // Handle Long (epoch milliseconds)
+        if (obj instanceof Long) {
+            return Instant.ofEpochMilli((Long) obj).atOffset(ZoneOffset.UTC);
+        }
+
+        // Try to parse as string
+        String str = obj.toString();
+        try {
+            return parseOffsetDateTimeFromString(str);
+        } catch (Exception e) {
+            throw new SQLException(
+                    "Failed to parse OffsetDateTime value: "
+                            + str
+                            + " (class: "
+                            + obj.getClass().getName()
+                            + ")",
+                    e);
+        }
+    }
+
+    public static OffsetDateTime parseOffsetDateTimeFromString(String str)
+            throws DateTimeParseException {
+        String trimmed = str.trim();
+        // Treat empty string as "no value"
+        if (trimmed.isEmpty()) {
+            return null;
+        }
+        // Try parsing as standard ISO-8601 OffsetDateTime
+        OffsetDateTime directParsed = tryParseOffsetDateTime(trimmed);
+        if (directParsed != null) {
+            return directParsed;
+        }
+        // Normalize common relaxed forms and try again
+        String normalized = normalizeOffsetDateTimeString(trimmed);
+        OffsetDateTime normalizedParsed = tryParseOffsetDateTime(normalized);
+        if (normalizedParsed != null) {
+            return normalizedParsed;
+        }
+        // Finally, try parsing as ZonedDateTime and convert to OffsetDateTime
+        OffsetDateTime zonedParsed = tryParseZonedDateTime(trimmed);
+        if (zonedParsed != null) {
+            return zonedParsed;
+        }
+
+        throw new DateTimeParseException(
+                "Unable to parse OffsetDateTime from string: " + str, trimmed, 
0);
+    }
+
+    private static OffsetDateTime tryParseOffsetDateTime(String value) {
+        try {
+            return OffsetDateTime.parse(value);
+        } catch (DateTimeParseException ignore) {
+            return null;
+        }
+    }
+
+    private static OffsetDateTime tryParseZonedDateTime(String value) {
+        try {
+            return ZonedDateTime.parse(value).toOffsetDateTime();
+        } catch (DateTimeParseException ignore) {
+            return null;
+        }
+    }
+
+    private static String normalizeOffsetDateTimeString(String value) {
+        String normalized = value;
+        if (normalized.endsWith(" UTC")) {
+            normalized = normalized.substring(0, normalized.length() - 4) + 
"Z";
+        }
+        normalized = normalized.replace(' ', 'T');
+        if (normalized.matches(".*[+-]\\d{2}$")) {
+            normalized = normalized + ":00";
+        } else if (normalized.matches(".*[+-]\\d{4}$")) {
+            normalized =
+                    normalized.substring(0, normalized.length() - 2)
+                            + ":"
+                            + normalized.substring(normalized.length() - 2);
+        }
+        return normalized;
+    }
+
     private static <T> T getNullableValue(
             ResultSet resultSet,
             int columnIndex,
             ThrowingFunction<ResultSet, T, SQLException> getter)
             throws SQLException {
-        if (resultSet.getObject(columnIndex) == null) {
+        final Object obj = resultSet.getObject(columnIndex);
+        if (obj == null) {
             return null;
         }
         return getter.apply(resultSet, columnIndex);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeConverterTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeConverterTest.java
index 11c99b41a8..17176db809 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeConverterTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeConverterTest.java
@@ -374,7 +374,7 @@ public class KingbaseTypeConverterTest {
                         .build();
         column = KingbaseTypeConverter.INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
-        Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, 
column.getDataType());
+        Assertions.assertEquals(LocalTimeType.OFFSET_DATE_TIME_TYPE, 
column.getDataType());
         Assertions.assertEquals(typeDefine.getScale(), column.getScale());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType().toLowerCase());
 
@@ -387,7 +387,7 @@ public class KingbaseTypeConverterTest {
                         .build();
         column = KingbaseTypeConverter.INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
-        Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, 
column.getDataType());
+        Assertions.assertEquals(LocalTimeType.OFFSET_DATE_TIME_TYPE, 
column.getDataType());
         Assertions.assertEquals(typeDefine.getScale(), column.getScale());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType().toLowerCase());
     }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverterTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverterTest.java
new file mode 100644
index 0000000000..d18986cecf
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverterTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql;
+
+import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.postgresql.util.PGobject;
+
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PostgresJdbcRowConverterTest {
+
+    private PostgresJdbcRowConverter converter;
+
+    @BeforeEach
+    public void setUp() {
+        converter = new PostgresJdbcRowConverter();
+    }
+
+    // Helper methods for test setup
+    private TableSchema createTableSchema(
+            String col2Name, Object col2DataType, String col2SourceType) {
+        List<Column> columns = new ArrayList<>();
+        
columns.add(PhysicalColumn.builder().name("id").dataType(BasicType.INT_TYPE).build());
+        PhysicalColumn.PhysicalColumnBuilder builder =
+                PhysicalColumn.builder()
+                        .name(col2Name)
+                        .dataType((SeaTunnelDataType<?>) col2DataType);
+        if (col2SourceType != null) {
+            builder.sourceType(col2SourceType);
+        }
+        columns.add(builder.build());
+        return TableSchema.builder().columns(columns).build();
+    }
+
+    private void setupMockResultSet(
+            ResultSet rs, String col1Type, String col2Type, Object col1Value, 
Object col2Value)
+            throws SQLException {
+        ResultSetMetaData metaData = mock(ResultSetMetaData.class);
+        when(rs.getMetaData()).thenReturn(metaData);
+        when(metaData.getColumnCount()).thenReturn(2);
+        when(metaData.getColumnTypeName(1)).thenReturn(col1Type);
+        when(metaData.getColumnTypeName(2)).thenReturn(col2Type);
+        // Handle multiple calls to getObject() - return same value each time
+        when(rs.getObject(1)).thenReturn(col1Value, col1Value);
+        when(rs.getObject(2)).thenReturn(col2Value, col2Value);
+        // Configure getInt() for INT type columns
+        if (col1Value instanceof Integer) {
+            when(rs.getInt(1)).thenReturn((Integer) col1Value);
+        }
+    }
+
+    private void assertOffsetDateTime(
+            OffsetDateTime offsetDateTime,
+            int year,
+            int month,
+            int day,
+            int hour,
+            int minute,
+            ZoneOffset offset) {
+        Assertions.assertEquals(year, offsetDateTime.getYear());
+        Assertions.assertEquals(month, offsetDateTime.getMonthValue());
+        Assertions.assertEquals(day, offsetDateTime.getDayOfMonth());
+        Assertions.assertEquals(hour, offsetDateTime.getHour());
+        Assertions.assertEquals(minute, offsetDateTime.getMinute());
+        Assertions.assertEquals(offset, offsetDateTime.getOffset());
+    }
+
+    @Test
+    public void testToInternalWithTimestampTzFromPGobject() throws 
SQLException {
+        ResultSet rs = mock(ResultSet.class);
+        TableSchema tableSchema =
+                createTableSchema("timestamp_tz_col", 
LocalTimeType.OFFSET_DATE_TIME_TYPE, null);
+
+        PGobject pgObject = new PGobject();
+        pgObject.setType("timestamptz");
+        pgObject.setValue("2023-05-07 14:30:00+08:00");
+
+        setupMockResultSet(rs, "INT4", "TIMESTAMPTZ", 1, pgObject);
+
+        SeaTunnelRow row = converter.toInternal(rs, tableSchema);
+
+        Assertions.assertNotNull(row);
+        Assertions.assertEquals(1, row.getField(0));
+
+        OffsetDateTime offsetDateTime = (OffsetDateTime) row.getField(1);
+        Assertions.assertNotNull(
+                offsetDateTime, "timestamp_tz_col should not be null when 
reading from PGobject");
+        assertOffsetDateTime(offsetDateTime, 2023, 5, 7, 14, 30, 
ZoneOffset.ofHours(8));
+    }
+
+    @Test
+    public void testToInternalWithTimestampTzFromString() throws SQLException {
+        ResultSet rs = mock(ResultSet.class);
+        TableSchema tableSchema =
+                createTableSchema("timestamp_tz_col", 
LocalTimeType.OFFSET_DATE_TIME_TYPE, null);
+
+        setupMockResultSet(rs, "INT4", "TIMESTAMPTZ", 1, "2023-05-07 
14:30:00+08:00");
+
+        SeaTunnelRow row = converter.toInternal(rs, tableSchema);
+
+        Assertions.assertNotNull(row);
+        Assertions.assertEquals(1, row.getField(0));
+
+        OffsetDateTime offsetDateTime = (OffsetDateTime) row.getField(1);
+        Assertions.assertNotNull(
+                offsetDateTime, "timestamp_tz_col should not be null when 
reading from string");
+        assertOffsetDateTime(offsetDateTime, 2023, 5, 7, 14, 30, 
ZoneOffset.ofHours(8));
+    }
+
+    @Test
+    public void testToInternalWithNullTimestampTz() throws SQLException {
+        ResultSet rs = mock(ResultSet.class);
+        TableSchema tableSchema =
+                createTableSchema("timestamp_tz_col", 
LocalTimeType.OFFSET_DATE_TIME_TYPE, null);
+
+        setupMockResultSet(rs, "INT4", "TIMESTAMPTZ", 1, null);
+
+        SeaTunnelRow row = converter.toInternal(rs, tableSchema);
+
+        Assertions.assertNotNull(row);
+        Assertions.assertEquals(1, row.getField(0));
+        Assertions.assertNull(row.getField(1), "timestamp_tz_col should be 
null");
+    }
+
+    @Test
+    public void testToInternalWithGeometryType() throws SQLException {
+        ResultSet rs = mock(ResultSet.class);
+        TableSchema tableSchema =
+                createTableSchema("geometry_col", BasicType.STRING_TYPE, 
"GEOMETRY");
+
+        setupMockResultSet(rs, "INT4", "GEOMETRY", 1, "POINT(1 2)");
+
+        SeaTunnelRow row = converter.toInternal(rs, tableSchema);
+
+        Assertions.assertNotNull(row);
+        Assertions.assertEquals(1, row.getField(0));
+        Assertions.assertEquals("POINT(1 2)", row.getField(1));
+    }
+
+    @Test
+    public void testToInternalWithNullGeometryType() throws SQLException {
+        ResultSet rs = mock(ResultSet.class);
+        TableSchema tableSchema =
+                createTableSchema("geometry_col", BasicType.STRING_TYPE, 
"GEOMETRY");
+
+        setupMockResultSet(rs, "INT4", "GEOMETRY", 1, null);
+
+        SeaTunnelRow row = converter.toInternal(rs, tableSchema);
+
+        Assertions.assertNotNull(row);
+        Assertions.assertEquals(1, row.getField(0));
+        Assertions.assertNull(row.getField(1), "geometry_col should be null");
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
index ce48ddab64..f8058a7c00 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
@@ -374,7 +374,7 @@ public class PostgresTypeConverterTest {
                         .build();
         column = PostgresTypeConverter.INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
-        Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, 
column.getDataType());
+        Assertions.assertEquals(LocalTimeType.OFFSET_DATE_TIME_TYPE, 
column.getDataType());
         Assertions.assertEquals(typeDefine.getScale(), column.getScale());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType().toLowerCase());
 
@@ -387,7 +387,7 @@ public class PostgresTypeConverterTest {
                         .build();
         column = PostgresTypeConverter.INSTANCE.convert(typeDefine);
         Assertions.assertEquals(typeDefine.getName(), column.getName());
-        Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, 
column.getDataType());
+        Assertions.assertEquals(LocalTimeType.OFFSET_DATE_TIME_TYPE, 
column.getDataType());
         Assertions.assertEquals(typeDefine.getScale(), column.getScale());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType().toLowerCase());
     }
@@ -744,6 +744,76 @@ public class PostgresTypeConverterTest {
                 typeDefine.getColumnType());
     }
 
+    @Test
+    public void testConvertTimestampTz() {
+        BasicTypeDefine<Object> typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("timestamptz")
+                        .dataType("timestamptz")
+                        .build();
+        Column column = PostgresTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(LocalTimeType.OFFSET_DATE_TIME_TYPE, 
column.getDataType());
+        Assertions.assertEquals(typeDefine.getScale(), column.getScale());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType().toLowerCase());
+
+        typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("timestamptz(6)")
+                        .dataType("timestamptz")
+                        .scale(6)
+                        .build();
+        column = PostgresTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(LocalTimeType.OFFSET_DATE_TIME_TYPE, 
column.getDataType());
+        Assertions.assertEquals(typeDefine.getScale(), column.getScale());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType().toLowerCase());
+    }
+
+    @Test
+    public void testReconvertTimestampTz() {
+        Column column =
+                PhysicalColumn.builder()
+                        .name("test")
+                        .dataType(LocalTimeType.OFFSET_DATE_TIME_TYPE)
+                        .build();
+
+        BasicTypeDefine<Object> typeDefine = 
PostgresTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(PostgresTypeConverter.PG_TIMESTAMP_TZ, 
typeDefine.getColumnType());
+        Assertions.assertEquals(PostgresTypeConverter.PG_TIMESTAMP_TZ, 
typeDefine.getDataType());
+
+        column =
+                PhysicalColumn.builder()
+                        .name("test")
+                        .dataType(LocalTimeType.OFFSET_DATE_TIME_TYPE)
+                        .scale(3)
+                        .build();
+
+        typeDefine = PostgresTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(
+                String.format("%s(%s)", PostgresTypeConverter.PG_TIMESTAMP_TZ, 
column.getScale()),
+                typeDefine.getColumnType());
+        Assertions.assertEquals(PostgresTypeConverter.PG_TIMESTAMP_TZ, 
typeDefine.getDataType());
+        Assertions.assertEquals(column.getScale(), typeDefine.getScale());
+
+        column =
+                PhysicalColumn.builder()
+                        .name("test")
+                        .dataType(LocalTimeType.OFFSET_DATE_TIME_TYPE)
+                        .scale(9)
+                        .build();
+
+        typeDefine = PostgresTypeConverter.INSTANCE.reconvert(column);
+        Assertions.assertEquals(column.getName(), typeDefine.getName());
+        Assertions.assertEquals(
+                String.format("%s(%s)", PostgresTypeConverter.PG_TIMESTAMP_TZ, 
6),
+                typeDefine.getColumnType());
+    }
+
     @Test
     public void testReconvertArray() {
         Column column =
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcFieldTypeUtilsTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcFieldTypeUtilsTest.java
new file mode 100644
index 0000000000..8135b4b618
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcFieldTypeUtilsTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Date;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class JdbcFieldTypeUtilsTest {
+
+    @Test
+    public void testGetOffsetDateTimeFromTimestampUsesInstant() throws 
SQLException {
+        Instant instant = Instant.parse("2025-01-01T00:00:00Z");
+        Timestamp timestamp = Timestamp.from(instant);
+
+        ResultSet rs = mock(ResultSet.class);
+        when(rs.getObject(1)).thenReturn(timestamp);
+        OffsetDateTime result = JdbcFieldTypeUtils.getOffsetDateTime(rs, 1);
+
+        assertEquals(instant, result.toInstant());
+        assertEquals(ZoneOffset.UTC, result.getOffset());
+    }
+
+    @Test
+    public void testGetOffsetDateTimeFromDate() throws SQLException {
+        Instant instant = Instant.parse("2025-02-02T12:34:56Z");
+        Date date = Date.from(instant);
+
+        ResultSet rs = mock(ResultSet.class);
+        when(rs.getObject(1)).thenReturn(date);
+        OffsetDateTime result = JdbcFieldTypeUtils.getOffsetDateTime(rs, 1);
+
+        assertEquals(instant, result.toInstant());
+        assertEquals(ZoneOffset.UTC, result.getOffset());
+    }
+
+    @Test
+    public void testGetOffsetDateTimeFromEpochMilli() throws SQLException {
+        Instant instant = Instant.parse("2025-03-03T08:00:00Z");
+        long epochMilli = instant.toEpochMilli();
+
+        ResultSet rs = mock(ResultSet.class);
+        when(rs.getObject(1)).thenReturn(epochMilli);
+        OffsetDateTime result = JdbcFieldTypeUtils.getOffsetDateTime(rs, 1);
+
+        assertEquals(instant, result.toInstant());
+        assertEquals(ZoneOffset.UTC, result.getOffset());
+    }
+
+    @Test
+    public void testGetOffsetDateTimeFromIsoString() throws SQLException {
+        Instant instant = Instant.parse("2025-04-04T16:20:30Z");
+        String value = "2025-04-04T16:20:30Z";
+
+        ResultSet rs = mock(ResultSet.class);
+        when(rs.getObject(1)).thenReturn(value);
+        OffsetDateTime result = JdbcFieldTypeUtils.getOffsetDateTime(rs, 1);
+
+        assertEquals(instant, result.toInstant());
+        assertEquals(ZoneOffset.UTC, result.getOffset());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
index 16767e51ba..c0ae2372b5 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
@@ -99,6 +99,7 @@ public class JdbcPostgresIT extends TestSuiteBase implements 
TestResource {
                     + "  bigserial_col BIGSERIAL,\n"
                     + "  date_col DATE,\n"
                     + "  timestamp_col TIMESTAMP,\n"
+                    + "  timestamp_tz_col TIMESTAMP WITH TIME ZONE,\n"
                     + "  bpchar_col BPCHAR(10),\n"
                     + "  age INT NOT null,\n"
                     + "  name VARCHAR(255) NOT null,\n"
@@ -135,6 +136,7 @@ public class JdbcPostgresIT extends TestSuiteBase 
implements TestResource {
                     + "    bigserial_col BIGSERIAL,\n"
                     + "    date_col DATE,\n"
                     + "    timestamp_col TIMESTAMP,\n"
+                    + "    timestamp_tz_col TIMESTAMP WITH TIME ZONE,\n"
                     + "    bpchar_col BPCHAR(10),\n"
                     + "    age int4 NOT NULL,\n"
                     + "    name varchar(255) NOT NULL,\n"
@@ -146,7 +148,7 @@ public class JdbcPostgresIT extends TestSuiteBase 
implements TestResource {
                     + "    multipolygon varchar(2000) NULL,\n"
                     + "    geometrycollection varchar(2000) NULL,\n"
                     + "    geog varchar(2000) NULL,\n"
-                    + "    json_col json NOT NULL \n,"
+                    + "    json_col json NOT NULL,\n"
                     + "    jsonb_col jsonb NOT NULL,\n"
                     + "    xml_col xml NOT NULL\n"
                     + "  )";
@@ -171,6 +173,7 @@ public class JdbcPostgresIT extends TestSuiteBase 
implements TestResource {
                     + "bigserial_col,\n"
                     + "date_col,\n"
                     + "timestamp_col,\n"
+                    + "timestamp_tz_col,\n"
                     + "bpchar_col,\n"
                     + "age,\n"
                     + "name,\n"
@@ -207,7 +210,8 @@ public class JdbcPostgresIT extends TestSuiteBase 
implements TestResource {
                     + "   bigserial_col,\n"
                     + "   date_col,\n"
                     + "   timestamp_col,\n"
-                    + "   bpchar_col,"
+                    + "   timestamp_tz_col,\n"
+                    + "   bpchar_col,\n"
                     + "  age,\n"
                     + "  name,\n"
                     + "  cast(point as geometry) as point,\n"
@@ -215,7 +219,7 @@ public class JdbcPostgresIT extends TestSuiteBase 
implements TestResource {
                     + "  cast(polygon_colums as geometry) as polygon_colums,\n"
                     + "  cast(multipoint as geometry) as multipoint,\n"
                     + "  cast(multilinestring as geometry) as 
multilinestring,\n"
-                    + "  cast(multipolygon as geometry) as multilinestring,\n"
+                    + "  cast(multipolygon as geometry) as multipolygon,\n"
                     + "  cast(geometrycollection as geometry) as 
geometrycollection,\n"
                     + "  cast(geog as geography) as geog,\n"
                     + "   json_col,\n"
@@ -338,7 +342,14 @@ public class JdbcPostgresIT extends TestSuiteBase 
implements TestResource {
                                 + " job run failed in "
                                 + container.getClass().getSimpleName()
                                 + ".");
-                Assertions.assertIterableEquals(querySql(SOURCE_SQL), 
querySql(SINK_SQL));
+                java.util.List<java.util.List<Object>> src = 
querySql(SOURCE_SQL);
+                java.util.List<java.util.List<Object>> dst = 
querySql(SINK_SQL);
+                if (!src.isEmpty() && !dst.isEmpty()) {
+                    Object srcTz = src.get(0).size() > 19 ? src.get(0).get(19) 
: null;
+                    Object dstTz = dst.get(0).size() > 19 ? dst.get(0).get(19) 
: null;
+                    log.info("First row tz src={}, dst={}", srcTz, dstTz);
+                }
+                Assertions.assertIterableEquals(src, dst);
             } finally {
                 executeSQL("truncate table pg_e2e_sink_table");
             }
@@ -411,6 +422,7 @@ public class JdbcPostgresIT extends TestSuiteBase 
implements TestResource {
                                 + "    bigserial_col,\n"
                                 + "    date_col,\n"
                                 + "    timestamp_col,\n"
+                                + "    timestamp_tz_col,\n"
                                 + "    bpchar_col,\n"
                                 + "    age,\n"
                                 + "    name,\n"
@@ -449,6 +461,7 @@ public class JdbcPostgresIT extends TestSuiteBase 
implements TestResource {
                                 + "    10000,\n"
                                 + "    '2023-05-07',\n"
                                 + "    '2023-05-07 14:30:00',\n"
+                                + "    '2023-05-07 14:30:00+08:00',\n"
                                 + "    'Testing',\n"
                                 + "    21,\n"
                                 + "    'Leblanc',\n"
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf
index 1915cfa3de..5d6785e647 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf
@@ -27,7 +27,7 @@ source{
         username = "test"
         password = "test"
         query ="""select gid, uuid_col, text_col, varchar_col, char_one_col, 
char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, 
numeric_col, real_col, double_precision_col,
-                         smallserial_col, serial_col, bigserial_col, date_col, 
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, 
multipoint,
+                         smallserial_col, serial_col, bigserial_col, date_col, 
timestamp_col, timestamp_tz_col, bpchar_col, age, name, point, linestring, 
polygon_colums, multipoint,
                          multilinestring, multipolygon, geometrycollection, 
geog, json_col, jsonb_col,xml_col from pg_e2e_source_table"""
       partition_column = "varchar_col"
       partition_num = 2
@@ -46,4 +46,4 @@ sink {
     table = public.pg_e2e_sink_table
     primary_keys = ["gid"]
   }
-}
\ No newline at end of file
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_stmt.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_stmt.conf
index 0bd44362ef..b8fa1b403d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_stmt.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_copy_stmt.conf
@@ -27,7 +27,7 @@ source{
         username = "test"
         password = "test"
         query ="""select gid, uuid_col, text_col, varchar_col, char_one_col, 
char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, 
numeric_col, real_col, double_precision_col,
-                         smallserial_col, serial_col, bigserial_col, date_col, 
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, 
multipoint,
+                         smallserial_col, serial_col, bigserial_col, date_col, 
timestamp_col, timestamp_tz_col, bpchar_col, age, name, point, linestring, 
polygon_colums, multipoint,
                          multilinestring, multipolygon, geometrycollection, 
geog, json_col, jsonb_col,xml_col from pg_e2e_source_table"""
       partition_column = "varchar_col"
       partition_num = 2
@@ -47,4 +47,4 @@ sink {
     use_copy_statement = true
     primary_keys = ["gid"]
   }
-}
\ No newline at end of file
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf
index 2d9cf1cb3b..f7775089ca 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf
@@ -27,7 +27,7 @@ source{
         username = "test"
         password = "test"
         query ="""select gid, uuid_col, text_col, varchar_col, char_one_col, 
char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, 
numeric_col, real_col, double_precision_col,
-                         smallserial_col, serial_col, bigserial_col, date_col, 
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, 
multipoint,
+                         smallserial_col, serial_col, bigserial_col, date_col, 
timestamp_col, timestamp_tz_col, bpchar_col, age, name, point, linestring, 
polygon_colums, multipoint,
                          multilinestring, multipolygon, geometrycollection, 
geog, json_col, jsonb_col,xml_col from pg_e2e_source_table"""
         partition_column= "gid"
 
@@ -46,8 +46,10 @@ sink {
         password = "test"
         connection_check_timeout_sec = 100
         query ="""INSERT INTO pg_e2e_sink_table ( gid, uuid_col, text_col, 
varchar_col, char_one_col, char_col, boolean_col, smallint_col, integer_col, 
bigint_col, decimal_col, numeric_col, real_col,
-                                                       double_precision_col, 
smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, 
bpchar_col, age, name, point,
+                                                       double_precision_col, 
smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, 
timestamp_tz_col, bpchar_col, age, name, point,
                                                        linestring, 
polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection, 
geog, json_col, jsonb_col,xml_col)
-                                          VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,? )"""
+                                          VALUES(:gid, :uuid_col, :text_col, 
:varchar_col, :char_one_col, :char_col, :boolean_col, :smallint_col, 
:integer_col, :bigint_col, :decimal_col, :numeric_col, :real_col,
+                                                 :double_precision_col, 
:smallserial_col, :serial_col, :bigserial_col, :date_col, :timestamp_col, 
:timestamp_tz_col, :bpchar_col, :age, :name, :point,
+                                                 :linestring, :polygon_colums, 
:multipoint, :multilinestring, :multipolygon, :geometrycollection, :geog, 
:json_col, :jsonb_col, :xml_col)"""
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
index 22d56fa568..c8c53e08bc 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
@@ -27,7 +27,7 @@ source{
         user = "test"
         password = "test"
         query ="""select gid, uuid_col, text_col, varchar_col, char_one_col, 
char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, 
numeric_col, real_col, double_precision_col,
-                         smallserial_col, serial_col, bigserial_col, date_col, 
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, 
multipoint,
+                         smallserial_col, serial_col, bigserial_col, date_col, 
timestamp_col, timestamp_tz_col, bpchar_col, age, name, point, linestring, 
polygon_colums, multipoint,
                          multilinestring, multipolygon, geometrycollection, 
geog, json_col, jsonb_col,xml_col from pg_e2e_source_table"""
         partition_column= "gid"
 
@@ -50,8 +50,10 @@ sink {
         password = "test"
         connection_check_timeout_sec = 100
         query ="""INSERT INTO pg_e2e_sink_table ( gid, uuid_col, text_col, 
varchar_col, char_one_col, char_col, boolean_col, smallint_col, integer_col, 
bigint_col, decimal_col, numeric_col, real_col,
-                                                       double_precision_col, 
smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, 
bpchar_col, age, name, point,
+                                                       double_precision_col, 
smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, 
timestamp_tz_col, bpchar_col, age, name, point,
                                                        linestring, 
polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection, 
geog, json_col, jsonb_col,xml_col )
-                                          VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ,?)"""
+                                          VALUES(:gid, :uuid_col, :text_col, 
:varchar_col, :char_one_col, :char_col, :boolean_col, :smallint_col, 
:integer_col, :bigint_col, :decimal_col, :numeric_col, :real_col,
+                                                 :double_precision_col, 
:smallserial_col, :serial_col, :bigserial_col, :date_col, :timestamp_col, 
:timestamp_tz_col, :bpchar_col, :age, :name, :point,
+                                                 :linestring, :polygon_colums, 
:multipoint, :multilinestring, :multipolygon, :geometrycollection, :geog, 
:json_col, :jsonb_col, :xml_col)"""
     }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
index fc72a29f5f..5f484873e8 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
@@ -27,7 +27,7 @@ source {
         username = "test"
         password = "test"
         query ="""select gid, uuid_col, text_col, varchar_col, char_one_col, 
char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col, 
numeric_col, real_col, double_precision_col,
-                         smallserial_col, serial_col, bigserial_col, date_col, 
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums, 
multipoint,
+                         smallserial_col, serial_col, bigserial_col, date_col, 
timestamp_col, timestamp_tz_col, bpchar_col, age, name, point, linestring, 
polygon_colums, multipoint,
                          multilinestring, multipolygon, geometrycollection, 
geog, json_col, jsonb_col ,xml_col from pg_e2e_source_table"""
     }
 }
@@ -43,9 +43,11 @@ sink {
         password = "test"
         max_retries = 0
         query ="""INSERT INTO pg_e2e_sink_table ( gid, uuid_col, text_col, 
varchar_col, char_one_col, char_col, boolean_col, smallint_col, integer_col, 
bigint_col, decimal_col, numeric_col, real_col,
-                                                       double_precision_col, 
smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, 
bpchar_col, age, name, point,
+                                                       double_precision_col, 
smallserial_col, serial_col, bigserial_col, date_col, timestamp_col, 
timestamp_tz_col, bpchar_col, age, name, point,
                                                        linestring, 
polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection, 
geog, json_col, jsonb_col ,xml_col)
-                                          VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)"""
+                                          VALUES(:gid, :uuid_col, :text_col, 
:varchar_col, :char_one_col, :char_col, :boolean_col, :smallint_col, 
:integer_col, :bigint_col, :decimal_col, :numeric_col, :real_col,
+                                                 :double_precision_col, 
:smallserial_col, :serial_col, :bigserial_col, :date_col, :timestamp_col, 
:timestamp_tz_col, :bpchar_col, :age, :name, :point,
+                                                 :linestring, :polygon_colums, 
:multipoint, :multilinestring, :multipolygon, :geometrycollection, :geog, 
:json_col, :jsonb_col, :xml_col)"""
 
         is_exactly_once = "true"
 

Reply via email to