This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new c44179b7 IGNITE-26093 Expand types support for CDC with PostgreSQL 
(#319)
c44179b7 is described below

commit c44179b7c85b3419c9f4799349497dfa55d1c5c4
Author: Maksim Davydov <[email protected]>
AuthorDate: Thu Aug 21 11:56:04 2025 +0300

    IGNITE-26093 Expand types support for CDC with PostgreSQL (#319)
---
 .../postgresql/IgniteToPostgreSqlCdcApplier.java   | 180 ++--------
 .../ignite/cdc/postgresql/JavaToSqlTypeMapper.java | 226 +++++++++++++
 .../CdcPostgreSqlReplicationAbstractTest.java      |  24 +-
 .../CdcPostgreSqlReplicationAppsTest.java          |   2 +-
 .../CdcPostgreSqlReplicationTest.java              |  22 +-
 .../cdc/postgresql/JavaToSqlTypeMapperTest.java    | 364 +++++++++++++++++++++
 6 files changed, 665 insertions(+), 153 deletions(-)

diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java
index 4399880f..abe9a4bd 100644
--- 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java
@@ -17,21 +17,18 @@
 
 package org.apache.ignite.cdc.postgresql;
 
-import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.sql.Types;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 import javax.sql.DataSource;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -45,59 +42,7 @@ import org.apache.ignite.internal.util.typedef.F;
 import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.UNDEFINED_CACHE_ID;
 
 /** */
-public class IgniteToPostgreSqlCdcApplier {
-    /** */
-    public static final String DFLT_SQL_TYPE = "OTHER";
-
-    /** */
-    public static final Map<String, String> JAVA_TO_SQL_TYPES;
-
-    /** */
-    public static final Set<String> SQL_TYPES_WITH_PRECISION_ONLY;
-
-    /** */
-    public static final Set<String> SQL_TYPES_WITH_PRECISION_AND_SCALE;
-
-    static {
-        Map<String, String> javaToSqlTypes = new HashMap<>();
-
-        javaToSqlTypes.put("java.lang.String", "VARCHAR");
-        javaToSqlTypes.put("java.lang.Integer", "INT");
-        javaToSqlTypes.put("int", "INT");
-        javaToSqlTypes.put("java.lang.Long", "BIGINT");
-        javaToSqlTypes.put("long", "BIGINT");
-        javaToSqlTypes.put("java.lang.Boolean", "BOOLEAN");
-        javaToSqlTypes.put("boolean", "BOOLEAN");
-        javaToSqlTypes.put("java.lang.Double", "DOUBLE PRECISION");
-        javaToSqlTypes.put("double", "DOUBLE PRECISION");
-        javaToSqlTypes.put("java.lang.Float", "REAL");
-        javaToSqlTypes.put("float", "REAL");
-        javaToSqlTypes.put("java.math.BigDecimal", "DECIMAL");
-        javaToSqlTypes.put("java.lang.Short", "SMALLINT");
-        javaToSqlTypes.put("short", "SMALLINT");
-        javaToSqlTypes.put("java.lang.Byte", "SMALLINT");
-        javaToSqlTypes.put("byte", "SMALLINT");
-        javaToSqlTypes.put("java.util.UUID", "UUID");
-        javaToSqlTypes.put("[B", "BYTEA");
-        javaToSqlTypes.put("java.lang.Object", "OTHER");
-
-        JAVA_TO_SQL_TYPES = Collections.unmodifiableMap(javaToSqlTypes);
-
-        Set<String> sqlTypesWithPrecisionOnly = new HashSet<>();
-
-        sqlTypesWithPrecisionOnly.add("VARCHAR");
-        sqlTypesWithPrecisionOnly.add("DOUBLE PRECISION");
-
-        SQL_TYPES_WITH_PRECISION_ONLY = 
Collections.unmodifiableSet(sqlTypesWithPrecisionOnly);
-
-        Set<String> sqlTypesWithPrecisionAndScale = new HashSet<>();
-
-        sqlTypesWithPrecisionAndScale.add("DECIMAL");
-        sqlTypesWithPrecisionAndScale.add("REAL");
-
-        SQL_TYPES_WITH_PRECISION_AND_SCALE = 
Collections.unmodifiableSet(sqlTypesWithPrecisionAndScale);
-    }
-
+class IgniteToPostgreSqlCdcApplier {
     /** */
     private final DataSource dataSrc;
 
@@ -107,6 +52,9 @@ public class IgniteToPostgreSqlCdcApplier {
     /** */
     private final IgniteLogger log;
 
+    /** */
+    private final JavaToSqlTypeMapper javaToSqlTypeMapper = new 
JavaToSqlTypeMapper();
+
     /** */
     private final Map<Integer, String> cacheIdToUpsertQry = new HashMap<>();
 
@@ -175,10 +123,9 @@ public class IgniteToPostgreSqlCdcApplier {
         boolean prevOpIsDelete = false;
         
         PreparedStatement curPrepStmt = null;
-        CdcEvent evt;
 
         while (evts.hasNext()) {
-            evt = evts.next();
+            CdcEvent evt = evts.next();
 
             if (log.isDebugEnabled())
                 log.debug("Event received [evt=" + evt + ']');
@@ -275,8 +222,6 @@ public class IgniteToPostgreSqlCdcApplier {
                 cacheIdToPrimaryKeys.get(evt.cacheId()).iterator() :
                 cacheIdToFields.get(evt.cacheId()).iterator();
 
-            String field;
-
             BinaryObject keyObj = (evt.key() instanceof BinaryObject) ? 
(BinaryObject)evt.key() : null;
             BinaryObject valObj = (evt.value() instanceof BinaryObject) ? 
(BinaryObject)evt.value() : null;
 
@@ -284,14 +229,14 @@ public class IgniteToPostgreSqlCdcApplier {
             Object obj;
 
             while (itFields.hasNext()) {
-                field = itFields.next();
+                String field = itFields.next();
 
                 if (cacheIdToPrimaryKeys.get(evt.cacheId()).contains(field))
                     obj = keyObj != null ? keyObj.field(field) : evt.key();
                 else
                     obj = valObj != null ? valObj.field(field) : evt.value();
 
-                addObject(curPrepStmt, idx, obj);
+                javaToSqlTypeMapper.setValue(curPrepStmt, idx, obj);
 
                 idx++;
             }
@@ -310,81 +255,44 @@ public class IgniteToPostgreSqlCdcApplier {
         }
     }
 
-    /**
-     * Sets a value in the PreparedStatement at the given index using the 
appropriate setter
-     * based on the runtime type of the object.
-     * @param curPrepStmt {@link PreparedStatement}
-     * @param idx value index in {@link PreparedStatement}
-     * @param obj value
-     */
-    private void addObject(PreparedStatement curPrepStmt, int idx, Object obj) 
throws SQLException {
-        if (obj == null) {
-            curPrepStmt.setObject(idx, null);
-
-            return;
-        }
-
-        if (obj instanceof String)
-            curPrepStmt.setString(idx, (String)obj);
-        else if (obj instanceof Integer)
-            curPrepStmt.setInt(idx, (Integer)obj);
-        else if (obj instanceof Long)
-            curPrepStmt.setLong(idx, (Long)obj);
-        else if (obj instanceof Short)
-            curPrepStmt.setShort(idx, (Short)obj);
-        else if (obj instanceof Byte)
-            curPrepStmt.setByte(idx, (Byte)obj);
-        else if (obj instanceof Boolean)
-            curPrepStmt.setBoolean(idx, (Boolean)obj);
-        else if (obj instanceof Float)
-            curPrepStmt.setFloat(idx, (Float)obj);
-        else if (obj instanceof Double)
-            curPrepStmt.setDouble(idx, (Double)obj);
-        else if (obj instanceof BigDecimal)
-            curPrepStmt.setBigDecimal(idx, (BigDecimal)obj);
-        else if (obj instanceof UUID)
-            curPrepStmt.setObject(idx, obj, Types.OTHER); // PostgreSQL 
expects UUID as OTHER
-        else if (obj instanceof byte[])
-            curPrepStmt.setBytes(idx, (byte[])obj);
-        else
-            curPrepStmt.setObject(idx, obj);
-    }
-
     /**
      * @param evts an {@link Iterator} of {@link CdcCacheEvent} objects to 
apply
      * @param createTables tables creation flag. If true - attempt to create 
tables will be made.
      * @return Number of applied events.
      */
     public long applyCacheEvents(Iterator<CdcCacheEvent> evts, boolean 
createTables) {
-        CdcCacheEvent evt;
-        QueryEntity entity;
-
         long cnt = 0;
 
         while (evts.hasNext()) {
-            evt = evts.next();
+            CdcCacheEvent evt = evts.next();
 
             if (evt.queryEntities().size() != 1)
                 throw new IgniteException("There should be exactly 1 
QueryEntity for cacheId: " + evt.cacheId());
 
-            entity = evt.queryEntities().iterator().next();
+            try {
+                QueryEntity entity = evt.queryEntities().iterator().next();
 
-            if (createTables)
-                createTableIfNotExists(entity);
+                if (createTables)
+                    createTableIfNotExists(entity);
 
-            cacheIdToUpsertQry.put(evt.cacheId(), getUpsertSqlQry(entity));
+                cacheIdToUpsertQry.put(evt.cacheId(), getUpsertSqlQry(entity));
 
-            cacheIdToDeleteQry.put(evt.cacheId(), getDeleteSqlQry(entity));
+                cacheIdToDeleteQry.put(evt.cacheId(), getDeleteSqlQry(entity));
 
-            cacheIdToPrimaryKeys.put(evt.cacheId(), getPrimaryKeys(entity));
+                cacheIdToPrimaryKeys.put(evt.cacheId(), 
getPrimaryKeys(entity));
 
-            cacheIdToFields.put(evt.cacheId(), entity.getFields().keySet());
+                cacheIdToFields.put(evt.cacheId(), 
entity.getFields().keySet());
 
-            if (createTables && log.isInfoEnabled())
-                log.info("Cache table created [tableName=" + 
entity.getTableName() +
-                    ", columns=" + entity.getFields().keySet() + ']');
+                if (createTables && log.isInfoEnabled())
+                    log.info("Cache table created [tableName=" + 
entity.getTableName() +
+                        ", columns=" + entity.getFields().keySet() + ']');
 
-            cnt++;
+                cnt++;
+            }
+            catch (IgniteException e) {
+                throw new IgniteException("Error occurred while preparing SQL 
statements for CdcCacheEvent [cacheId=" +
+                     evt.cacheId() + "]. Exclude cache from replication or fix 
the issue: " + e.getMessage(), e);
+            }
         }
 
         return cnt;
@@ -437,33 +345,15 @@ public class IgniteToPostgreSqlCdcApplier {
     private void addFieldsAndTypes(QueryEntity entity, StringBuilder sql) {
         Iterator<Map.Entry<String, String>> iter = 
entity.getFields().entrySet().iterator();
 
-        Map.Entry<String, String> field;
-        String type;
-
-        Integer precision;
-        Integer scale;
-
         while (iter.hasNext()) {
-            field = iter.next();
-            type = JAVA_TO_SQL_TYPES.getOrDefault(field.getValue(), 
DFLT_SQL_TYPE);
-
-            sql.append(field.getKey()).append(" ").append(type);
+            Map.Entry<String, String> field = iter.next();
 
-            precision = entity.getFieldsPrecision().get(field.getKey());
-            scale = entity.getFieldsScale().get(field.getKey());
+            Integer precision = 
entity.getFieldsPrecision().get(field.getKey());
+            Integer scale = entity.getFieldsScale().get(field.getKey());
 
-            if (precision != null && precision > 0) {
-                if (SQL_TYPES_WITH_PRECISION_ONLY.contains(type))
-                    sql.append("(").append(precision).append(")");
-                else if (SQL_TYPES_WITH_PRECISION_AND_SCALE.contains(type)) {
-                    sql.append("(").append(precision);
+            String type = javaToSqlTypeMapper.renderSqlType(field.getValue(), 
precision, scale);
 
-                    if (scale != null && scale >= 0)
-                        sql.append(", ").append(scale);
-
-                    sql.append(")");
-                }
-            }
+            sql.append(field.getKey()).append(" ").append(type);
 
             if (iter.hasNext())
                 sql.append(", ");
@@ -524,10 +414,9 @@ public class IgniteToPostgreSqlCdcApplier {
      */
     private void addFields(QueryEntity entity, StringBuilder sql) {
         Iterator<Map.Entry<String, String>> iter = 
entity.getFields().entrySet().iterator();
-        Map.Entry<String, String> field;
 
         while (iter.hasNext()) {
-            field = iter.next();
+            Map.Entry<String, String> field = iter.next();
 
             sql.append(field.getKey());
 
@@ -547,12 +436,10 @@ public class IgniteToPostgreSqlCdcApplier {
 
         Iterator<String> itAllFields = F.concat(false, "version", 
entity.getFields().keySet()).iterator();
 
-        String field;
-
         boolean first = true;
 
         while (itAllFields.hasNext()) {
-            field = itAllFields.next();
+            String field = itAllFields.next();
 
             if (primaryFields.contains(field))
                 continue;
@@ -588,10 +475,9 @@ public class IgniteToPostgreSqlCdcApplier {
         StringBuilder deleteQry = new StringBuilder("DELETE FROM 
").append(entity.getTableName()).append(" WHERE (");
 
         Iterator<String> itKeys = getPrimaryKeys(entity).iterator();
-        String key;
 
         while (itKeys.hasNext()) {
-            key = itKeys.next();
+            String key = itKeys.next();
 
             deleteQry.append(key).append(" = ?");
 
diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/JavaToSqlTypeMapper.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/JavaToSqlTypeMapper.java
new file mode 100644
index 00000000..708caeea
--- /dev/null
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/JavaToSqlTypeMapper.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.postgresql;
+
+import java.math.BigDecimal;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.OffsetTime;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteException;
+
+/** */
+class JavaToSqlTypeMapper {
+    /** */
+    private static final Map<String, JavaToSqlType> JAVA_TO_SQL_TYPE_MAP = new 
HashMap<>();
+
+    static {
+        for (JavaToSqlType type : JavaToSqlType.values())
+            JAVA_TO_SQL_TYPE_MAP.put(type.javaTypeName(), type);
+    }
+
+    /**
+     * Sets a value in the PreparedStatement at the given index using the 
appropriate setter
+     * based on the runtime type of the object.
+     * @param stmt {@link PreparedStatement}
+     * @param idx value index in {@link PreparedStatement}
+     * @param obj value
+     */
+    public void setValue(PreparedStatement stmt, int idx, Object obj) {
+        try {
+            assert obj != null;
+
+            if (obj instanceof byte[]) {
+                stmt.setBytes(idx, (byte[])obj); // Preferred setter for byte[]
+
+                return;
+            }
+
+            JavaToSqlType type = 
JAVA_TO_SQL_TYPE_MAP.get(obj.getClass().getName());
+
+            if (type != null)
+                stmt.setObject(idx, obj, type.typeId());
+            else
+                throw new IgniteException("Java-to-SQL type mapping is not 
defined for class: "
+                    + obj.getClass().getName());
+        }
+        catch (SQLException e) {
+            throw new IgniteException("Failed to set value to statement 
[stmt=" + stmt + ", valueType=" +
+                obj.getClass().getName() + ", index=" + idx + ']', e);
+        }
+    }
+
+    /**
+     * Renders the SQL type declaration for a given Java class name based on 
its mapping,
+     * optionally including precision and scale if the SQL type supports them.
+     *
+     * @param clsName  the fully qualified Java class name used to look up the 
corresponding SQL type
+     * @param precision optional precision value to include in the SQL type, 
if supported
+     * @param scale     optional scale value to include in the SQL type, if 
supported
+     * @return the SQL type string with appropriate precision and scale 
formatting
+     */
+    public String renderSqlType(String clsName, Integer precision, Integer 
scale) {
+        JavaToSqlType type = JAVA_TO_SQL_TYPE_MAP.get(clsName);
+
+        if (type == null)
+            throw new IgniteException("Java-to-SQL type mapping is not defined 
for class: " + clsName);
+
+        String sqlType = type.sqlType();
+
+        if (type.precision() && precision != null) {
+            return type.scale() && scale != null
+                ? String.format("%s(%d, %d)", sqlType, precision, scale)
+                : String.format("%s(%d)", sqlType, precision);
+        }
+
+        return sqlType;
+    }
+
+    /** */
+    enum JavaToSqlType {
+        /** */
+        STRING(String.class, "VARCHAR", true, false, Types.VARCHAR),
+
+        /** */
+        INTEGER(Integer.class, "INT", false, false, Types.INTEGER),
+
+        /** */
+        LONG(Long.class, "BIGINT", false, false, Types.BIGINT),
+
+        /** */
+        BOOLEAN(Boolean.class, "BOOL", false, false, Types.BOOLEAN),
+
+        /** */
+        DOUBLE(Double.class, "NUMERIC", true, true, Types.DOUBLE),
+
+        /** */
+        FLOAT(Float.class, "NUMERIC", true, true, Types.FLOAT),
+
+        /** */
+        BIG_DECIMAL(BigDecimal.class, "NUMERIC", true, true, Types.DECIMAL),
+
+        /** */
+        SHORT(Short.class, "SMALLINT", false, false, Types.SMALLINT),
+
+        /** */
+        BYTE(Byte.class, "SMALLINT", false, false, Types.SMALLINT),
+
+        /** */
+        SQL_DATE(java.sql.Date.class, "DATE", false, false, Types.DATE),
+
+        /** */
+        SQL_TIME(java.sql.Time.class, "TIME", true, false, Types.TIME),
+
+        /** */
+        SQL_TIMESTAMP(java.sql.Timestamp.class, "TIMESTAMP", true, false, 
Types.TIMESTAMP),
+
+        /** */
+        UTIL_DATE(java.util.Date.class, "TIMESTAMP", true, false, 
Types.TIMESTAMP),
+
+        /** */
+        UUID_TYPE(UUID.class, "UUID", false, false, Types.OTHER),
+
+        /** */
+        LOCAL_DATE(LocalDate.class, "DATE", false, false, Types.DATE),
+
+        /** */
+        LOCAL_TIME(LocalTime.class, "TIME", true, false, Types.TIME),
+
+        /** */
+        LOCAL_DATE_TIME(LocalDateTime.class, "TIMESTAMP", true, false, 
Types.TIMESTAMP),
+
+        /** */
+        OFFSET_TIME(OffsetTime.class, "VARCHAR", true, false, Types.VARCHAR),
+
+        /** */
+        OFFSET_DATE_TIME(OffsetDateTime.class, "TIMESTAMP WITH TIME ZONE", 
false, false,
+            Types.TIMESTAMP_WITH_TIMEZONE),
+
+        /** */
+        BYTE_ARRAY(byte[].class, "BYTEA", false, false, Types.OTHER);
+
+        /** */
+        private final String javaTypeName;
+
+        /** */
+        private final String sqlType;
+
+        /** */
+        private final boolean precision;
+
+        /** */
+        private final boolean scale;
+
+        /** */
+        private final int typeId;
+
+        /**
+         * @param javaTypeCls Java type class.
+         * @param sqlType Sql type.
+         * @param precision Has precision.
+         * @param scale Has scale.
+         * @param typeId {@link Types}
+         */
+        JavaToSqlType(
+            Class<?> javaTypeCls,
+            String sqlType,
+            boolean precision,
+            boolean scale,
+            int typeId
+        ) {
+            this.javaTypeName = javaTypeCls.getName();
+            this.sqlType = sqlType;
+            this.precision = precision;
+            this.scale = scale;
+            this.typeId = typeId;
+        }
+
+        /** */
+        String javaTypeName() {
+            return javaTypeName;
+        }
+
+        /** */
+        String sqlType() {
+            return sqlType;
+        }
+
+        /** */
+        boolean precision() {
+            return precision;
+        }
+
+        /** */
+        boolean scale() {
+            return scale;
+        }
+
+        /** */
+        int typeId() {
+            return typeId;
+        }
+    }
+}
+
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAbstractTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/CdcPostgreSqlReplicationAbstractTest.java
similarity index 88%
rename from 
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAbstractTest.java
rename to 
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/CdcPostgreSqlReplicationAbstractTest.java
index 57da44b5..33d123cf 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAbstractTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/CdcPostgreSqlReplicationAbstractTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.cdc.postgres;
+package org.apache.ignite.cdc.postgresql;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
@@ -29,18 +29,18 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cdc.CdcConfiguration;
-import org.apache.ignite.cdc.postgresql.IgniteToPostgreSqlCdcConsumer;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.util.function.ThrowableFunction;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 
 /** */
-public class CdcPostgreSqlReplicationAbstractTest extends 
GridCommonAbstractTest {
+public abstract class CdcPostgreSqlReplicationAbstractTest extends 
GridCommonAbstractTest {
     /** */
     protected static final int BATCH_SIZE = 128;
 
@@ -68,6 +68,24 @@ public class CdcPostgreSqlReplicationAbstractTest extends 
GridCommonAbstractTest
         }
     }
 
+    /** */
+    protected boolean selectOnPostgreSqlAndAct(
+        EmbeddedPostgres postgres,
+        String qry,
+        ThrowableFunction<Boolean, ResultSet, SQLException> action
+    ) {
+        try (Connection conn = postgres.getPostgresDatabase().getConnection()) 
{
+            PreparedStatement stmt = conn.prepareStatement(qry);
+
+            try (ResultSet rs = stmt.executeQuery()) {
+                return action.apply(rs);
+            }
+        }
+        catch (SQLException e) {
+            throw new IgniteException(e);
+        }
+    }
+
     /** */
     protected void executeOnPostgreSql(EmbeddedPostgres postgres, String qry) {
         try (Connection conn = postgres.getPostgresDatabase().getConnection()) 
{
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAppsTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/CdcPostgreSqlReplicationAppsTest.java
similarity index 97%
rename from 
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAppsTest.java
rename to 
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/CdcPostgreSqlReplicationAppsTest.java
index d7c39555..f6eec625 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAppsTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/CdcPostgreSqlReplicationAppsTest.java
@@ -1,4 +1,4 @@
-package org.apache.ignite.cdc.postgres;
+package org.apache.ignite.cdc.postgresql;
 
 import java.util.HashMap;
 import java.util.Map;
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/CdcPostgreSqlReplicationTest.java
similarity index 97%
rename from 
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationTest.java
rename to 
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/CdcPostgreSqlReplicationTest.java
index 468ff4df..60d80302 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/CdcPostgreSqlReplicationTest.java
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.cdc.postgres;
+package org.apache.ignite.cdc.postgresql;
 
+import java.net.URL;
 import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -39,7 +40,6 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.QueryEntity;
-import org.apache.ignite.cdc.postgresql.IgniteToPostgreSqlCdcConsumer;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -56,6 +56,7 @@ import org.junit.runners.Parameterized;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.junit.Assume.assumeTrue;
 
 /** */
 @RunWith(Parameterized.class)
@@ -462,6 +463,23 @@ public class CdcPostgreSqlReplicationTest extends 
CdcPostgreSqlReplicationAbstra
         testQueryEntityReplicationError(qryOnlyValName);
     }
 
+    /** */
+    @Test
+    public void testQueryWithUnknownClassMapper() throws 
IgniteCheckedException {
+        assumeTrue(createTables);
+
+        Class<?> unknownCls = URL.class;
+
+        QueryEntity qryUnknownCls = new QueryEntity()
+            .setTableName("qryKeyValName")
+            .setKeyFieldName("id")
+            .setValueFieldName("name")
+            .addQueryField("id", Integer.class.getName(), null)
+            .addQueryField("name", unknownCls.getName(), null);
+
+        testQueryEntityReplicationError(qryUnknownCls);
+    }
+
     /** */
     private void testQueryEntityReplicationError(QueryEntity qryEntity) throws 
IgniteInterruptedCheckedException {
         CacheConfiguration<Integer, TestVal> ccfg = new 
CacheConfiguration<Integer, TestVal>(qryEntity.getTableName())
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/JavaToSqlTypeMapperTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/JavaToSqlTypeMapperTest.java
new file mode 100644
index 00000000..2b358332
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/JavaToSqlTypeMapperTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.postgresql;
+
+import java.math.BigDecimal;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.OffsetTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import io.zonky.test.db.postgres.embedded.EmbeddedPostgres;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+public class JavaToSqlTypeMapperTest extends 
CdcPostgreSqlReplicationAbstractTest {
+    /** Type mapping. */
+    private static final Map<Class<?>, String> TYPE_MAPPING = Map.of(
+        Byte.class, "int2",
+        Short.class, "int2",
+        Integer.class, "int4",
+        Long.class, "int8",
+        OffsetDateTime.class, "timestamptz"
+    );
+
+    /** */
+    private static final int PRECISION_EXTRA_TIME = 9;
+
+    /** */
+    private static final int PRECISION_EXTRA_DATE_TIME = 23;
+
+    /** */
+    private static final String KEY_COLUMN_NAME = "id";
+
+    /** */
+    private static final String VAL_COLUMN_NAME = "value";
+
+    /** */
+    private final JavaToSqlTypeMapper javaToSqlTypeMapper = new 
JavaToSqlTypeMapper();
+
+    /** */
+    private IgniteEx src;
+
+    /** */
+    private EmbeddedPostgres postgres;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        DataRegionConfiguration dataRegionConfiguration = new 
DataRegionConfiguration()
+            .setPersistenceEnabled(true)
+            .setCdcEnabled(true);
+
+        DataStorageConfiguration dataStorageConfiguration = new 
DataStorageConfiguration()
+            .setWalForceArchiveTimeout(5_000)
+            .setDefaultDataRegionConfiguration(dataRegionConfiguration);
+
+        cfg.setDataStorageConfiguration(dataStorageConfiguration);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteToPostgreSqlCdcConsumer 
getCdcConsumerConfiguration() {
+        IgniteToPostgreSqlCdcConsumer cdcCfg = 
super.getCdcConsumerConfiguration();
+
+        cdcCfg.setCreateTables(true);
+
+        return cdcCfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        cleanPersistenceDir();
+
+        src = startGrid(0);
+
+        src.cluster().state(ClusterState.ACTIVE);
+
+        postgres = EmbeddedPostgres.builder().start();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        postgres.close();
+
+        cleanPersistenceDir();
+    }
+
+    /** */
+    @Test
+    public void javaToPostgreSqlTypesMappingTest() throws Exception {
+        Set<TestCase> valuesToReplicate = getValuesToReplicate();
+
+        Set<String> cachesToReplicate = 
valuesToReplicate.stream().map(TestCase::tableName).collect(Collectors.toSet());
+
+        IgniteInternalFuture<?> fut = startIgniteToPostgreSqlCdcConsumer(
+            src.configuration(),
+            cachesToReplicate,
+            postgres.getPostgresDatabase()
+        );
+
+        valuesToReplicate.forEach(this::createCacheWithValue);
+
+        assertTrue(waitForCondition(waitForTablesCreatedOnPostgres(postgres, 
cachesToReplicate), getTestTimeout()));
+
+        for (String cache : cachesToReplicate)
+            assertTrue(waitForCondition(waitForTableSize(postgres, cache, 1), 
getTestTimeout()));
+
+        valuesToReplicate.forEach(this::checkTable);
+
+        fut.cancel();
+    }
+
+    /** */
+    private Set<TestCase> getValuesToReplicate() {
+        LocalDateTime locDateTime = LocalDateTime.of(1999, 8, 6, 23, 30, 3);
+
+        return Set.of(
+            new TestCase("string"),
+            new TestCase("string", 10),
+            new TestCase(5),
+            new TestCase(6L),
+            new TestCase(true),
+            new TestCase(23.0),
+            new TestCase(23.0, 4),
+            new TestCase(23.343, 10, 3),
+            new TestCase(23.0f),
+            new TestCase(33.0f, 4),
+            new TestCase(23.12f, 5, 2),
+            new TestCase(new BigDecimal("1")),
+            new TestCase(new BigDecimal("323"), 3),
+            new TestCase(new BigDecimal("1.623"), 5, 3),
+            new TestCase((short)23),
+            new TestCase((byte)33),
+            new TestCase(new Time(43830000)),
+            new TestCase(new Time(43830123), 3),
+            new TestCase(new Timestamp(1755000630000L)),
+            new TestCase(new Timestamp(1755000630123L), 3),
+            new TestCase(new Date(933960600000L)),
+            new TestCase(UUID.randomUUID()),
+            new TestCase(LocalDate.of(1999, 8, 6)),
+            new TestCase(LocalTime.of(12, 12, 12)),
+            new TestCase(LocalTime.of(12, 12, 12), 6),
+            new TestCase(locDateTime),
+            new TestCase(locDateTime, 4),
+            new TestCase(OffsetTime.now()),
+            new TestCase(OffsetTime.now(), 30),
+            new TestCase(OffsetDateTime.of(locDateTime, 
ZoneOffset.ofHours(3))),
+            new TestCase(new byte[]{1, 2, 3, 4})
+        );
+    }
+
+    /** */
+    private void createCacheWithValue(TestCase testCase) {
+        String clsName = testCase.value().getClass().getName();
+
+        QueryEntity qryEntity = new QueryEntity()
+            .setTableName(testCase.tableName())
+            .setKeyFieldName(KEY_COLUMN_NAME)
+            .setValueFieldName(VAL_COLUMN_NAME)
+            .addQueryField(KEY_COLUMN_NAME, Integer.class.getName(), null)
+            .addQueryField(VAL_COLUMN_NAME, clsName, null);
+
+        if (testCase.precision() != null)
+            qryEntity.setFieldsPrecision(Map.of(VAL_COLUMN_NAME, 
testCase.precision()));
+
+        if (testCase.scale() != null)
+            qryEntity.setFieldsScale(Map.of(VAL_COLUMN_NAME, 
testCase.scale()));
+
+        CacheConfiguration<Integer, Object> ccfg = new 
CacheConfiguration<Integer, Object>(qryEntity.getTableName())
+            .setQueryEntities(Collections.singletonList(qryEntity));
+
+        try (IgniteCache<Integer, Object> cache = src.getOrCreateCache(ccfg)) {
+            cache.put(1, testCase.value());
+        }
+    }
+
+    /** */
+    private void checkTable(TestCase testCase) {
+        selectOnPostgreSqlAndAct(postgres, "SELECT * FROM " + 
testCase.tableName(), (res) -> {
+            assertTrue(res.next());
+
+            checkValue(res, testCase.value());
+
+            ResultSetMetaData meta = res.getMetaData();
+
+            String actTypeName = meta.getColumnTypeName(2);
+
+            checkType(testCase.value(), actTypeName);
+
+            if (testCase.precision() != null)
+                checkValueMeta(meta, testCase.value(), testCase.precision(), 
testCase.scale());
+
+            return true;
+        });
+    }
+
+    /** */
+    private void checkValue(ResultSet res, Object val) throws SQLException {
+        if (val instanceof Boolean)
+            assert Objects.equals(res.getBoolean(VAL_COLUMN_NAME), val);
+        else if (val instanceof Double)
+            assert Objects.equals(res.getDouble(VAL_COLUMN_NAME), val);
+        else if (val instanceof Float)
+            assert Objects.equals(res.getFloat(VAL_COLUMN_NAME), val);
+        else if (val instanceof BigDecimal)
+            assert Objects.equals(res.getBigDecimal(VAL_COLUMN_NAME), val);
+        else if (val instanceof Timestamp )
+            assert Objects.equals(res.getTimestamp(VAL_COLUMN_NAME), val);
+        else if (val instanceof LocalDateTime)
+            assert Objects.equals(res.getTimestamp(VAL_COLUMN_NAME), 
Timestamp.valueOf((LocalDateTime)val));
+        else if (val instanceof Date)
+            assert Objects.equals(res.getTimestamp(VAL_COLUMN_NAME).getTime(), 
((Date)val).getTime());
+        else if (val instanceof OffsetDateTime)
+            assert Objects.equals(res.getObject(VAL_COLUMN_NAME, 
OffsetDateTime.class).withOffsetSameInstant(ZoneOffset.UTC),
+                ((OffsetDateTime)val).withOffsetSameInstant(ZoneOffset.UTC));
+        else if (val instanceof byte[])
+            assert Arrays.equals(res.getBytes(VAL_COLUMN_NAME), (byte[])val);
+        else
+            assert Objects.equals(res.getString(VAL_COLUMN_NAME), 
val.toString());
+    }
+
+    /**
+     * Verifies that the PostgreSQL type name matches the expected mapping for 
the given Java value.
+     *
+     * @param val          Java value to check.
+     * @param actTypeName  Actual PostgreSQL type name from metadata.
+     */
+    private void checkType(Object val, String actTypeName) {
+        String actual = actTypeName.toLowerCase();
+        String expected = TYPE_MAPPING.get(val.getClass());
+
+        if (expected == null) {
+            expected = javaToSqlTypeMapper
+                .renderSqlType(val.getClass().getName(), null, null)
+                .toLowerCase();
+        }
+
+        assert Objects.equals(actual, expected);
+    }
+
+    /**
+     * Verifies that the precision and scale reported by {@link 
ResultSetMetaData}
+     * match the expected values for the given value type.
+     *
+     * @param meta      The result set metadata to check.
+     * @param val       The value whose type determines expected 
precision/scale.
+     * @param precision The expected base precision (without temporal 
adjustments).
+     * @param scale     The expected scale, or {@code null} if not applicable.
+     * @throws SQLException If metadata retrieval fails.
+     */
+    private void checkValueMeta(ResultSetMetaData meta, Object val, Integer 
precision, Integer scale) throws SQLException {
+        int actualPrecision = meta.getPrecision(2);
+        int actualScale = meta.getScale(2);
+
+        int expectedPrecision = precision;
+
+        if (val instanceof Time || val instanceof LocalTime)
+            expectedPrecision += PRECISION_EXTRA_TIME;
+        else if (val instanceof LocalDateTime || val instanceof Date)
+            expectedPrecision += PRECISION_EXTRA_DATE_TIME;
+
+        assert Objects.equals(actualPrecision, expectedPrecision);
+
+        if (val instanceof LocalTime || val instanceof LocalDateTime || val 
instanceof Date)
+            assert Objects.equals(actualScale, precision);
+        else
+            assert scale == null || Objects.equals(actualScale, scale);
+    }
+
+    /** */
+    private static class TestCase {
+        /** */
+        private final Object val;
+
+        /** */
+        private Integer precision = null;
+
+        /** */
+        private Integer scale = null;
+
+        /** */
+        private TestCase(Object val) {
+            this.val = val;
+        }
+
+        /** */
+        private TestCase(Object val, Integer precision) {
+            this.val = val;
+            this.precision = precision;
+        }
+
+        /** */
+        private TestCase(Object val, Integer precision, Integer scale) {
+            this.val = val;
+            this.precision = precision;
+            this.scale = scale;
+        }
+
+        /** */
+        public Object value() {
+            return val;
+        }
+
+        /** */
+        public Integer precision() {
+            return precision;
+        }
+
+        /** */
+        public Integer scale() {
+            return scale;
+        }
+
+        /** */
+        public String tableName() {
+            return val.getClass().getName().replace('.', '_').replace('[', '_')
+                + (precision != null ? '_' + precision : "") + (scale != null 
? '_' + scale : "");
+        }
+    }
+}


Reply via email to