This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new c44179b7 IGNITE-26093 Expand types support for CDC with PostgreSQL
(#319)
c44179b7 is described below
commit c44179b7c85b3419c9f4799349497dfa55d1c5c4
Author: Maksim Davydov <[email protected]>
AuthorDate: Thu Aug 21 11:56:04 2025 +0300
IGNITE-26093 Expand types support for CDC with PostgreSQL (#319)
---
.../postgresql/IgniteToPostgreSqlCdcApplier.java | 180 ++--------
.../ignite/cdc/postgresql/JavaToSqlTypeMapper.java | 226 +++++++++++++
.../CdcPostgreSqlReplicationAbstractTest.java | 24 +-
.../CdcPostgreSqlReplicationAppsTest.java | 2 +-
.../CdcPostgreSqlReplicationTest.java | 22 +-
.../cdc/postgresql/JavaToSqlTypeMapperTest.java | 364 +++++++++++++++++++++
6 files changed, 665 insertions(+), 153 deletions(-)
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java
index 4399880f..abe9a4bd 100644
---
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java
@@ -17,21 +17,18 @@
package org.apache.ignite.cdc.postgresql;
-import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
-import java.sql.Types;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import javax.sql.DataSource;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -45,59 +42,7 @@ import org.apache.ignite.internal.util.typedef.F;
import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.UNDEFINED_CACHE_ID;
/** */
-public class IgniteToPostgreSqlCdcApplier {
- /** */
- public static final String DFLT_SQL_TYPE = "OTHER";
-
- /** */
- public static final Map<String, String> JAVA_TO_SQL_TYPES;
-
- /** */
- public static final Set<String> SQL_TYPES_WITH_PRECISION_ONLY;
-
- /** */
- public static final Set<String> SQL_TYPES_WITH_PRECISION_AND_SCALE;
-
- static {
- Map<String, String> javaToSqlTypes = new HashMap<>();
-
- javaToSqlTypes.put("java.lang.String", "VARCHAR");
- javaToSqlTypes.put("java.lang.Integer", "INT");
- javaToSqlTypes.put("int", "INT");
- javaToSqlTypes.put("java.lang.Long", "BIGINT");
- javaToSqlTypes.put("long", "BIGINT");
- javaToSqlTypes.put("java.lang.Boolean", "BOOLEAN");
- javaToSqlTypes.put("boolean", "BOOLEAN");
- javaToSqlTypes.put("java.lang.Double", "DOUBLE PRECISION");
- javaToSqlTypes.put("double", "DOUBLE PRECISION");
- javaToSqlTypes.put("java.lang.Float", "REAL");
- javaToSqlTypes.put("float", "REAL");
- javaToSqlTypes.put("java.math.BigDecimal", "DECIMAL");
- javaToSqlTypes.put("java.lang.Short", "SMALLINT");
- javaToSqlTypes.put("short", "SMALLINT");
- javaToSqlTypes.put("java.lang.Byte", "SMALLINT");
- javaToSqlTypes.put("byte", "SMALLINT");
- javaToSqlTypes.put("java.util.UUID", "UUID");
- javaToSqlTypes.put("[B", "BYTEA");
- javaToSqlTypes.put("java.lang.Object", "OTHER");
-
- JAVA_TO_SQL_TYPES = Collections.unmodifiableMap(javaToSqlTypes);
-
- Set<String> sqlTypesWithPrecisionOnly = new HashSet<>();
-
- sqlTypesWithPrecisionOnly.add("VARCHAR");
- sqlTypesWithPrecisionOnly.add("DOUBLE PRECISION");
-
- SQL_TYPES_WITH_PRECISION_ONLY =
Collections.unmodifiableSet(sqlTypesWithPrecisionOnly);
-
- Set<String> sqlTypesWithPrecisionAndScale = new HashSet<>();
-
- sqlTypesWithPrecisionAndScale.add("DECIMAL");
- sqlTypesWithPrecisionAndScale.add("REAL");
-
- SQL_TYPES_WITH_PRECISION_AND_SCALE =
Collections.unmodifiableSet(sqlTypesWithPrecisionAndScale);
- }
-
+class IgniteToPostgreSqlCdcApplier {
/** */
private final DataSource dataSrc;
@@ -107,6 +52,9 @@ public class IgniteToPostgreSqlCdcApplier {
/** */
private final IgniteLogger log;
+ /** */
+ private final JavaToSqlTypeMapper javaToSqlTypeMapper = new
JavaToSqlTypeMapper();
+
/** */
private final Map<Integer, String> cacheIdToUpsertQry = new HashMap<>();
@@ -175,10 +123,9 @@ public class IgniteToPostgreSqlCdcApplier {
boolean prevOpIsDelete = false;
PreparedStatement curPrepStmt = null;
- CdcEvent evt;
while (evts.hasNext()) {
- evt = evts.next();
+ CdcEvent evt = evts.next();
if (log.isDebugEnabled())
log.debug("Event received [evt=" + evt + ']');
@@ -275,8 +222,6 @@ public class IgniteToPostgreSqlCdcApplier {
cacheIdToPrimaryKeys.get(evt.cacheId()).iterator() :
cacheIdToFields.get(evt.cacheId()).iterator();
- String field;
-
BinaryObject keyObj = (evt.key() instanceof BinaryObject) ?
(BinaryObject)evt.key() : null;
BinaryObject valObj = (evt.value() instanceof BinaryObject) ?
(BinaryObject)evt.value() : null;
@@ -284,14 +229,14 @@ public class IgniteToPostgreSqlCdcApplier {
Object obj;
while (itFields.hasNext()) {
- field = itFields.next();
+ String field = itFields.next();
if (cacheIdToPrimaryKeys.get(evt.cacheId()).contains(field))
obj = keyObj != null ? keyObj.field(field) : evt.key();
else
obj = valObj != null ? valObj.field(field) : evt.value();
- addObject(curPrepStmt, idx, obj);
+ javaToSqlTypeMapper.setValue(curPrepStmt, idx, obj);
idx++;
}
@@ -310,81 +255,44 @@ public class IgniteToPostgreSqlCdcApplier {
}
}
- /**
- * Sets a value in the PreparedStatement at the given index using the
appropriate setter
- * based on the runtime type of the object.
- * @param curPrepStmt {@link PreparedStatement}
- * @param idx value index in {@link PreparedStatement}
- * @param obj value
- */
- private void addObject(PreparedStatement curPrepStmt, int idx, Object obj)
throws SQLException {
- if (obj == null) {
- curPrepStmt.setObject(idx, null);
-
- return;
- }
-
- if (obj instanceof String)
- curPrepStmt.setString(idx, (String)obj);
- else if (obj instanceof Integer)
- curPrepStmt.setInt(idx, (Integer)obj);
- else if (obj instanceof Long)
- curPrepStmt.setLong(idx, (Long)obj);
- else if (obj instanceof Short)
- curPrepStmt.setShort(idx, (Short)obj);
- else if (obj instanceof Byte)
- curPrepStmt.setByte(idx, (Byte)obj);
- else if (obj instanceof Boolean)
- curPrepStmt.setBoolean(idx, (Boolean)obj);
- else if (obj instanceof Float)
- curPrepStmt.setFloat(idx, (Float)obj);
- else if (obj instanceof Double)
- curPrepStmt.setDouble(idx, (Double)obj);
- else if (obj instanceof BigDecimal)
- curPrepStmt.setBigDecimal(idx, (BigDecimal)obj);
- else if (obj instanceof UUID)
- curPrepStmt.setObject(idx, obj, Types.OTHER); // PostgreSQL
expects UUID as OTHER
- else if (obj instanceof byte[])
- curPrepStmt.setBytes(idx, (byte[])obj);
- else
- curPrepStmt.setObject(idx, obj);
- }
-
/**
* @param evts an {@link Iterator} of {@link CdcCacheEvent} objects to
apply
* @param createTables tables creation flag. If true - attempt to create
tables will be made.
* @return Number of applied events.
*/
public long applyCacheEvents(Iterator<CdcCacheEvent> evts, boolean
createTables) {
- CdcCacheEvent evt;
- QueryEntity entity;
-
long cnt = 0;
while (evts.hasNext()) {
- evt = evts.next();
+ CdcCacheEvent evt = evts.next();
if (evt.queryEntities().size() != 1)
throw new IgniteException("There should be exactly 1
QueryEntity for cacheId: " + evt.cacheId());
- entity = evt.queryEntities().iterator().next();
+ try {
+ QueryEntity entity = evt.queryEntities().iterator().next();
- if (createTables)
- createTableIfNotExists(entity);
+ if (createTables)
+ createTableIfNotExists(entity);
- cacheIdToUpsertQry.put(evt.cacheId(), getUpsertSqlQry(entity));
+ cacheIdToUpsertQry.put(evt.cacheId(), getUpsertSqlQry(entity));
- cacheIdToDeleteQry.put(evt.cacheId(), getDeleteSqlQry(entity));
+ cacheIdToDeleteQry.put(evt.cacheId(), getDeleteSqlQry(entity));
- cacheIdToPrimaryKeys.put(evt.cacheId(), getPrimaryKeys(entity));
+ cacheIdToPrimaryKeys.put(evt.cacheId(),
getPrimaryKeys(entity));
- cacheIdToFields.put(evt.cacheId(), entity.getFields().keySet());
+ cacheIdToFields.put(evt.cacheId(),
entity.getFields().keySet());
- if (createTables && log.isInfoEnabled())
- log.info("Cache table created [tableName=" +
entity.getTableName() +
- ", columns=" + entity.getFields().keySet() + ']');
+ if (createTables && log.isInfoEnabled())
+ log.info("Cache table created [tableName=" +
entity.getTableName() +
+ ", columns=" + entity.getFields().keySet() + ']');
- cnt++;
+ cnt++;
+ }
+ catch (IgniteException e) {
+ throw new IgniteException("Error occurred while preparing SQL
statements for CdcCacheEvent [cacheId=" +
+ evt.cacheId() + "]. Exclude cache from replication or fix
the issue: " + e.getMessage(), e);
+ }
}
return cnt;
@@ -437,33 +345,15 @@ public class IgniteToPostgreSqlCdcApplier {
private void addFieldsAndTypes(QueryEntity entity, StringBuilder sql) {
Iterator<Map.Entry<String, String>> iter =
entity.getFields().entrySet().iterator();
- Map.Entry<String, String> field;
- String type;
-
- Integer precision;
- Integer scale;
-
while (iter.hasNext()) {
- field = iter.next();
- type = JAVA_TO_SQL_TYPES.getOrDefault(field.getValue(),
DFLT_SQL_TYPE);
-
- sql.append(field.getKey()).append(" ").append(type);
+ Map.Entry<String, String> field = iter.next();
- precision = entity.getFieldsPrecision().get(field.getKey());
- scale = entity.getFieldsScale().get(field.getKey());
+ Integer precision =
entity.getFieldsPrecision().get(field.getKey());
+ Integer scale = entity.getFieldsScale().get(field.getKey());
- if (precision != null && precision > 0) {
- if (SQL_TYPES_WITH_PRECISION_ONLY.contains(type))
- sql.append("(").append(precision).append(")");
- else if (SQL_TYPES_WITH_PRECISION_AND_SCALE.contains(type)) {
- sql.append("(").append(precision);
+ String type = javaToSqlTypeMapper.renderSqlType(field.getValue(),
precision, scale);
- if (scale != null && scale >= 0)
- sql.append(", ").append(scale);
-
- sql.append(")");
- }
- }
+ sql.append(field.getKey()).append(" ").append(type);
if (iter.hasNext())
sql.append(", ");
@@ -524,10 +414,9 @@ public class IgniteToPostgreSqlCdcApplier {
*/
private void addFields(QueryEntity entity, StringBuilder sql) {
Iterator<Map.Entry<String, String>> iter =
entity.getFields().entrySet().iterator();
- Map.Entry<String, String> field;
while (iter.hasNext()) {
- field = iter.next();
+ Map.Entry<String, String> field = iter.next();
sql.append(field.getKey());
@@ -547,12 +436,10 @@ public class IgniteToPostgreSqlCdcApplier {
Iterator<String> itAllFields = F.concat(false, "version",
entity.getFields().keySet()).iterator();
- String field;
-
boolean first = true;
while (itAllFields.hasNext()) {
- field = itAllFields.next();
+ String field = itAllFields.next();
if (primaryFields.contains(field))
continue;
@@ -588,10 +475,9 @@ public class IgniteToPostgreSqlCdcApplier {
StringBuilder deleteQry = new StringBuilder("DELETE FROM
").append(entity.getTableName()).append(" WHERE (");
Iterator<String> itKeys = getPrimaryKeys(entity).iterator();
- String key;
while (itKeys.hasNext()) {
- key = itKeys.next();
+ String key = itKeys.next();
deleteQry.append(key).append(" = ?");
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/JavaToSqlTypeMapper.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/JavaToSqlTypeMapper.java
new file mode 100644
index 00000000..708caeea
--- /dev/null
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/JavaToSqlTypeMapper.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.postgresql;
+
+import java.math.BigDecimal;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.OffsetTime;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteException;
+
+/** */
+class JavaToSqlTypeMapper {
+ /** */
+ private static final Map<String, JavaToSqlType> JAVA_TO_SQL_TYPE_MAP = new
HashMap<>();
+
+ static {
+ for (JavaToSqlType type : JavaToSqlType.values())
+ JAVA_TO_SQL_TYPE_MAP.put(type.javaTypeName(), type);
+ }
+
+ /**
+ * Sets a value in the PreparedStatement at the given index using the
appropriate setter
+ * based on the runtime type of the object.
+ * @param stmt {@link PreparedStatement}
+ * @param idx value index in {@link PreparedStatement}
+ * @param obj value
+ */
+ public void setValue(PreparedStatement stmt, int idx, Object obj) {
+ try {
+ assert obj != null;
+
+ if (obj instanceof byte[]) {
+ stmt.setBytes(idx, (byte[])obj); // Preferred setter for byte[]
+
+ return;
+ }
+
+ JavaToSqlType type =
JAVA_TO_SQL_TYPE_MAP.get(obj.getClass().getName());
+
+ if (type != null)
+ stmt.setObject(idx, obj, type.typeId());
+ else
+ throw new IgniteException("Java-to-SQL type mapping is not
defined for class: "
+ + obj.getClass().getName());
+ }
+ catch (SQLException e) {
+ throw new IgniteException("Failed to set value to statement
[stmt=" + stmt + ", valueType=" +
+ obj.getClass().getName() + ", index=" + idx + ']', e);
+ }
+ }
+
+ /**
+ * Renders the SQL type declaration for a given Java class name based on
its mapping,
+ * optionally including precision and scale if the SQL type supports them.
+ *
+ * @param clsName the fully qualified Java class name used to look up the
corresponding SQL type
+ * @param precision optional precision value to include in the SQL type,
if supported
+ * @param scale optional scale value to include in the SQL type, if
supported
+ * @return the SQL type string with appropriate precision and scale
formatting
+ */
+ public String renderSqlType(String clsName, Integer precision, Integer
scale) {
+ JavaToSqlType type = JAVA_TO_SQL_TYPE_MAP.get(clsName);
+
+ if (type == null)
+ throw new IgniteException("Java-to-SQL type mapping is not defined
for class: " + clsName);
+
+ String sqlType = type.sqlType();
+
+ if (type.precision() && precision != null) {
+ return type.scale() && scale != null
+ ? String.format("%s(%d, %d)", sqlType, precision, scale)
+ : String.format("%s(%d)", sqlType, precision);
+ }
+
+ return sqlType;
+ }
+
+ /** */
+ enum JavaToSqlType {
+ /** */
+ STRING(String.class, "VARCHAR", true, false, Types.VARCHAR),
+
+ /** */
+ INTEGER(Integer.class, "INT", false, false, Types.INTEGER),
+
+ /** */
+ LONG(Long.class, "BIGINT", false, false, Types.BIGINT),
+
+ /** */
+ BOOLEAN(Boolean.class, "BOOL", false, false, Types.BOOLEAN),
+
+ /** */
+ DOUBLE(Double.class, "NUMERIC", true, true, Types.DOUBLE),
+
+ /** */
+ FLOAT(Float.class, "NUMERIC", true, true, Types.FLOAT),
+
+ /** */
+ BIG_DECIMAL(BigDecimal.class, "NUMERIC", true, true, Types.DECIMAL),
+
+ /** */
+ SHORT(Short.class, "SMALLINT", false, false, Types.SMALLINT),
+
+ /** */
+ BYTE(Byte.class, "SMALLINT", false, false, Types.SMALLINT),
+
+ /** */
+ SQL_DATE(java.sql.Date.class, "DATE", false, false, Types.DATE),
+
+ /** */
+ SQL_TIME(java.sql.Time.class, "TIME", true, false, Types.TIME),
+
+ /** */
+ SQL_TIMESTAMP(java.sql.Timestamp.class, "TIMESTAMP", true, false,
Types.TIMESTAMP),
+
+ /** */
+ UTIL_DATE(java.util.Date.class, "TIMESTAMP", true, false,
Types.TIMESTAMP),
+
+ /** */
+ UUID_TYPE(UUID.class, "UUID", false, false, Types.OTHER),
+
+ /** */
+ LOCAL_DATE(LocalDate.class, "DATE", false, false, Types.DATE),
+
+ /** */
+ LOCAL_TIME(LocalTime.class, "TIME", true, false, Types.TIME),
+
+ /** */
+ LOCAL_DATE_TIME(LocalDateTime.class, "TIMESTAMP", true, false,
Types.TIMESTAMP),
+
+ /** */
+ OFFSET_TIME(OffsetTime.class, "VARCHAR", true, false, Types.VARCHAR),
+
+ /** */
+ OFFSET_DATE_TIME(OffsetDateTime.class, "TIMESTAMP WITH TIME ZONE",
false, false,
+ Types.TIMESTAMP_WITH_TIMEZONE),
+
+ /** */
+ BYTE_ARRAY(byte[].class, "BYTEA", false, false, Types.OTHER);
+
+ /** */
+ private final String javaTypeName;
+
+ /** */
+ private final String sqlType;
+
+ /** */
+ private final boolean precision;
+
+ /** */
+ private final boolean scale;
+
+ /** */
+ private final int typeId;
+
+ /**
+ * @param javaTypeCls Java type class.
+ * @param sqlType Sql type.
+ * @param precision Has precision.
+ * @param scale Has scale.
+ * @param typeId {@link Types}
+ */
+ JavaToSqlType(
+ Class<?> javaTypeCls,
+ String sqlType,
+ boolean precision,
+ boolean scale,
+ int typeId
+ ) {
+ this.javaTypeName = javaTypeCls.getName();
+ this.sqlType = sqlType;
+ this.precision = precision;
+ this.scale = scale;
+ this.typeId = typeId;
+ }
+
+ /** */
+ String javaTypeName() {
+ return javaTypeName;
+ }
+
+ /** */
+ String sqlType() {
+ return sqlType;
+ }
+
+ /** */
+ boolean precision() {
+ return precision;
+ }
+
+ /** */
+ boolean scale() {
+ return scale;
+ }
+
+ /** */
+ int typeId() {
+ return typeId;
+ }
+ }
+}
+
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAbstractTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/CdcPostgreSqlReplicationAbstractTest.java
similarity index 88%
rename from
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAbstractTest.java
rename to
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/CdcPostgreSqlReplicationAbstractTest.java
index 57da44b5..33d123cf 100644
---
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAbstractTest.java
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/CdcPostgreSqlReplicationAbstractTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.cdc.postgres;
+package org.apache.ignite.cdc.postgresql;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -29,18 +29,18 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cdc.CdcConfiguration;
-import org.apache.ignite.cdc.postgresql.IgniteToPostgreSqlCdcConsumer;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.util.function.ThrowableFunction;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
/** */
-public class CdcPostgreSqlReplicationAbstractTest extends
GridCommonAbstractTest {
+public abstract class CdcPostgreSqlReplicationAbstractTest extends
GridCommonAbstractTest {
/** */
protected static final int BATCH_SIZE = 128;
@@ -68,6 +68,24 @@ public class CdcPostgreSqlReplicationAbstractTest extends
GridCommonAbstractTest
}
}
+ /** */
+ protected boolean selectOnPostgreSqlAndAct(
+ EmbeddedPostgres postgres,
+ String qry,
+ ThrowableFunction<Boolean, ResultSet, SQLException> action
+ ) {
+ try (Connection conn = postgres.getPostgresDatabase().getConnection())
{
+ PreparedStatement stmt = conn.prepareStatement(qry);
+
+ try (ResultSet rs = stmt.executeQuery()) {
+ return action.apply(rs);
+ }
+ }
+ catch (SQLException e) {
+ throw new IgniteException(e);
+ }
+ }
+
/** */
protected void executeOnPostgreSql(EmbeddedPostgres postgres, String qry) {
try (Connection conn = postgres.getPostgresDatabase().getConnection())
{
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAppsTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/CdcPostgreSqlReplicationAppsTest.java
similarity index 97%
rename from
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAppsTest.java
rename to
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/CdcPostgreSqlReplicationAppsTest.java
index d7c39555..f6eec625 100644
---
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAppsTest.java
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/CdcPostgreSqlReplicationAppsTest.java
@@ -1,4 +1,4 @@
-package org.apache.ignite.cdc.postgres;
+package org.apache.ignite.cdc.postgresql;
import java.util.HashMap;
import java.util.Map;
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/CdcPostgreSqlReplicationTest.java
similarity index 97%
rename from
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationTest.java
rename to
modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/CdcPostgreSqlReplicationTest.java
index 468ff4df..60d80302 100644
---
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationTest.java
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/CdcPostgreSqlReplicationTest.java
@@ -15,8 +15,9 @@
* limitations under the License.
*/
-package org.apache.ignite.cdc.postgres;
+package org.apache.ignite.cdc.postgresql;
+import java.net.URL;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Arrays;
@@ -39,7 +40,6 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.QueryEntity;
-import org.apache.ignite.cdc.postgresql.IgniteToPostgreSqlCdcConsumer;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -56,6 +56,7 @@ import org.junit.runners.Parameterized;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.junit.Assume.assumeTrue;
/** */
@RunWith(Parameterized.class)
@@ -462,6 +463,23 @@ public class CdcPostgreSqlReplicationTest extends
CdcPostgreSqlReplicationAbstra
testQueryEntityReplicationError(qryOnlyValName);
}
+ /** */
+ @Test
+ public void testQueryWithUnknownClassMapper() throws
IgniteCheckedException {
+ assumeTrue(createTables);
+
+ Class<?> unknownCls = URL.class;
+
+ QueryEntity qryUnknownCls = new QueryEntity()
+ .setTableName("qryKeyValName")
+ .setKeyFieldName("id")
+ .setValueFieldName("name")
+ .addQueryField("id", Integer.class.getName(), null)
+ .addQueryField("name", unknownCls.getName(), null);
+
+ testQueryEntityReplicationError(qryUnknownCls);
+ }
+
/** */
private void testQueryEntityReplicationError(QueryEntity qryEntity) throws
IgniteInterruptedCheckedException {
CacheConfiguration<Integer, TestVal> ccfg = new
CacheConfiguration<Integer, TestVal>(qryEntity.getTableName())
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/JavaToSqlTypeMapperTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/JavaToSqlTypeMapperTest.java
new file mode 100644
index 00000000..2b358332
--- /dev/null
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgresql/JavaToSqlTypeMapperTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.postgresql;
+
+import java.math.BigDecimal;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.time.OffsetTime;
+import java.time.ZoneOffset;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import io.zonky.test.db.postgres.embedded.EmbeddedPostgres;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+public class JavaToSqlTypeMapperTest extends
CdcPostgreSqlReplicationAbstractTest {
+ /** Type mapping. */
+ private static final Map<Class<?>, String> TYPE_MAPPING = Map.of(
+ Byte.class, "int2",
+ Short.class, "int2",
+ Integer.class, "int4",
+ Long.class, "int8",
+ OffsetDateTime.class, "timestamptz"
+ );
+
+ /** */
+ private static final int PRECISION_EXTRA_TIME = 9;
+
+ /** */
+ private static final int PRECISION_EXTRA_DATE_TIME = 23;
+
+ /** */
+ private static final String KEY_COLUMN_NAME = "id";
+
+ /** */
+ private static final String VAL_COLUMN_NAME = "value";
+
+ /** */
+ private final JavaToSqlTypeMapper javaToSqlTypeMapper = new
JavaToSqlTypeMapper();
+
+ /** */
+ private IgniteEx src;
+
+ /** */
+ private EmbeddedPostgres postgres;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ DataRegionConfiguration dataRegionConfiguration = new
DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ .setCdcEnabled(true);
+
+ DataStorageConfiguration dataStorageConfiguration = new
DataStorageConfiguration()
+ .setWalForceArchiveTimeout(5_000)
+ .setDefaultDataRegionConfiguration(dataRegionConfiguration);
+
+ cfg.setDataStorageConfiguration(dataStorageConfiguration);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteToPostgreSqlCdcConsumer
getCdcConsumerConfiguration() {
+ IgniteToPostgreSqlCdcConsumer cdcCfg =
super.getCdcConsumerConfiguration();
+
+ cdcCfg.setCreateTables(true);
+
+ return cdcCfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ cleanPersistenceDir();
+
+ src = startGrid(0);
+
+ src.cluster().state(ClusterState.ACTIVE);
+
+ postgres = EmbeddedPostgres.builder().start();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ postgres.close();
+
+ cleanPersistenceDir();
+ }
+
+ /** */
+ @Test
+ public void javaToPostgreSqlTypesMappingTest() throws Exception {
+ Set<TestCase> valuesToReplicate = getValuesToReplicate();
+
+ Set<String> cachesToReplicate =
valuesToReplicate.stream().map(TestCase::tableName).collect(Collectors.toSet());
+
+ IgniteInternalFuture<?> fut = startIgniteToPostgreSqlCdcConsumer(
+ src.configuration(),
+ cachesToReplicate,
+ postgres.getPostgresDatabase()
+ );
+
+ valuesToReplicate.forEach(this::createCacheWithValue);
+
+ assertTrue(waitForCondition(waitForTablesCreatedOnPostgres(postgres,
cachesToReplicate), getTestTimeout()));
+
+ for (String cache : cachesToReplicate)
+ assertTrue(waitForCondition(waitForTableSize(postgres, cache, 1),
getTestTimeout()));
+
+ valuesToReplicate.forEach(this::checkTable);
+
+ fut.cancel();
+ }
+
+ /** */
+ private Set<TestCase> getValuesToReplicate() {
+ LocalDateTime locDateTime = LocalDateTime.of(1999, 8, 6, 23, 30, 3);
+
+ return Set.of(
+ new TestCase("string"),
+ new TestCase("string", 10),
+ new TestCase(5),
+ new TestCase(6L),
+ new TestCase(true),
+ new TestCase(23.0),
+ new TestCase(23.0, 4),
+ new TestCase(23.343, 10, 3),
+ new TestCase(23.0f),
+ new TestCase(33.0f, 4),
+ new TestCase(23.12f, 5, 2),
+ new TestCase(new BigDecimal("1")),
+ new TestCase(new BigDecimal("323"), 3),
+ new TestCase(new BigDecimal("1.623"), 5, 3),
+ new TestCase((short)23),
+ new TestCase((byte)33),
+ new TestCase(new Time(43830000)),
+ new TestCase(new Time(43830123), 3),
+ new TestCase(new Timestamp(1755000630000L)),
+ new TestCase(new Timestamp(1755000630123L), 3),
+ new TestCase(new Date(933960600000L)),
+ new TestCase(UUID.randomUUID()),
+ new TestCase(LocalDate.of(1999, 8, 6)),
+ new TestCase(LocalTime.of(12, 12, 12)),
+ new TestCase(LocalTime.of(12, 12, 12), 6),
+ new TestCase(locDateTime),
+ new TestCase(locDateTime, 4),
+ new TestCase(OffsetTime.now()),
+ new TestCase(OffsetTime.now(), 30),
+ new TestCase(OffsetDateTime.of(locDateTime,
ZoneOffset.ofHours(3))),
+ new TestCase(new byte[]{1, 2, 3, 4})
+ );
+ }
+
+ /** */
+ private void createCacheWithValue(TestCase testCase) {
+ String clsName = testCase.value().getClass().getName();
+
+ QueryEntity qryEntity = new QueryEntity()
+ .setTableName(testCase.tableName())
+ .setKeyFieldName(KEY_COLUMN_NAME)
+ .setValueFieldName(VAL_COLUMN_NAME)
+ .addQueryField(KEY_COLUMN_NAME, Integer.class.getName(), null)
+ .addQueryField(VAL_COLUMN_NAME, clsName, null);
+
+ if (testCase.precision() != null)
+ qryEntity.setFieldsPrecision(Map.of(VAL_COLUMN_NAME,
testCase.precision()));
+
+ if (testCase.scale() != null)
+ qryEntity.setFieldsScale(Map.of(VAL_COLUMN_NAME,
testCase.scale()));
+
+ CacheConfiguration<Integer, Object> ccfg = new
CacheConfiguration<Integer, Object>(qryEntity.getTableName())
+ .setQueryEntities(Collections.singletonList(qryEntity));
+
+ try (IgniteCache<Integer, Object> cache = src.getOrCreateCache(ccfg)) {
+ cache.put(1, testCase.value());
+ }
+ }
+
+ /** */
+ private void checkTable(TestCase testCase) {
+ selectOnPostgreSqlAndAct(postgres, "SELECT * FROM " +
testCase.tableName(), (res) -> {
+ assertTrue(res.next());
+
+ checkValue(res, testCase.value());
+
+ ResultSetMetaData meta = res.getMetaData();
+
+ String actTypeName = meta.getColumnTypeName(2);
+
+ checkType(testCase.value(), actTypeName);
+
+ if (testCase.precision() != null)
+ checkValueMeta(meta, testCase.value(), testCase.precision(),
testCase.scale());
+
+ return true;
+ });
+ }
+
+ /** */
+ private void checkValue(ResultSet res, Object val) throws SQLException {
+ if (val instanceof Boolean)
+ assert Objects.equals(res.getBoolean(VAL_COLUMN_NAME), val);
+ else if (val instanceof Double)
+ assert Objects.equals(res.getDouble(VAL_COLUMN_NAME), val);
+ else if (val instanceof Float)
+ assert Objects.equals(res.getFloat(VAL_COLUMN_NAME), val);
+ else if (val instanceof BigDecimal)
+ assert Objects.equals(res.getBigDecimal(VAL_COLUMN_NAME), val);
+ else if (val instanceof Timestamp )
+ assert Objects.equals(res.getTimestamp(VAL_COLUMN_NAME), val);
+ else if (val instanceof LocalDateTime)
+ assert Objects.equals(res.getTimestamp(VAL_COLUMN_NAME),
Timestamp.valueOf((LocalDateTime)val));
+ else if (val instanceof Date)
+ assert Objects.equals(res.getTimestamp(VAL_COLUMN_NAME).getTime(),
((Date)val).getTime());
+ else if (val instanceof OffsetDateTime)
+ assert Objects.equals(res.getObject(VAL_COLUMN_NAME,
OffsetDateTime.class).withOffsetSameInstant(ZoneOffset.UTC),
+ ((OffsetDateTime)val).withOffsetSameInstant(ZoneOffset.UTC));
+ else if (val instanceof byte[])
+ assert Arrays.equals(res.getBytes(VAL_COLUMN_NAME), (byte[])val);
+ else
+ assert Objects.equals(res.getString(VAL_COLUMN_NAME),
val.toString());
+ }
+
+ /**
+ * Verifies that the PostgreSQL type name matches the expected mapping for
the given Java value.
+ *
+ * @param val Java value to check.
+ * @param actTypeName Actual PostgreSQL type name from metadata.
+ */
+ private void checkType(Object val, String actTypeName) {
+ String actual = actTypeName.toLowerCase();
+ String expected = TYPE_MAPPING.get(val.getClass());
+
+ if (expected == null) {
+ expected = javaToSqlTypeMapper
+ .renderSqlType(val.getClass().getName(), null, null)
+ .toLowerCase();
+ }
+
+ assert Objects.equals(actual, expected);
+ }
+
+ /**
+ * Verifies that the precision and scale reported by {@link
ResultSetMetaData}
+ * match the expected values for the given value type.
+ *
+ * @param meta The result set metadata to check.
+ * @param val The value whose type determines expected
precision/scale.
+ * @param precision The expected base precision (without temporal
adjustments).
+ * @param scale The expected scale, or {@code null} if not applicable.
+ * @throws SQLException If metadata retrieval fails.
+ */
+ private void checkValueMeta(ResultSetMetaData meta, Object val, Integer
precision, Integer scale) throws SQLException {
+ int actualPrecision = meta.getPrecision(2);
+ int actualScale = meta.getScale(2);
+
+ int expectedPrecision = precision;
+
+ if (val instanceof Time || val instanceof LocalTime)
+ expectedPrecision += PRECISION_EXTRA_TIME;
+ else if (val instanceof LocalDateTime || val instanceof Date)
+ expectedPrecision += PRECISION_EXTRA_DATE_TIME;
+
+ assert Objects.equals(actualPrecision, expectedPrecision);
+
+ if (val instanceof LocalTime || val instanceof LocalDateTime || val
instanceof Date)
+ assert Objects.equals(actualScale, precision);
+ else
+ assert scale == null || Objects.equals(actualScale, scale);
+ }
+
+ /** */
+ private static class TestCase {
+ /** */
+ private final Object val;
+
+ /** */
+ private Integer precision = null;
+
+ /** */
+ private Integer scale = null;
+
+ /** */
+ private TestCase(Object val) {
+ this.val = val;
+ }
+
+ /** */
+ private TestCase(Object val, Integer precision) {
+ this.val = val;
+ this.precision = precision;
+ }
+
+ /** */
+ private TestCase(Object val, Integer precision, Integer scale) {
+ this.val = val;
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ /** */
+ public Object value() {
+ return val;
+ }
+
+ /** */
+ public Integer precision() {
+ return precision;
+ }
+
+ /** */
+ public Integer scale() {
+ return scale;
+ }
+
+ /** */
+ public String tableName() {
+ return val.getClass().getName().replace('.', '_').replace('[', '_')
+ + (precision != null ? '_' + precision : "") + (scale != null
? '_' + scale : "");
+ }
+ }
+}