This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new a6027e64 IGNITE-25774 Add Postgres CDC consumer (#311)
a6027e64 is described below

commit a6027e64669b0af63a126f794fc1b1ab8bfff7d6
Author: Maksim Davydov <[email protected]>
AuthorDate: Fri Aug 1 15:37:44 2025 +0300

    IGNITE-25774 Add Postgres CDC consumer (#311)
---
 modules/cdc-ext/pom.xml                            |  26 +
 .../postgresql/IgniteToPostgreSqlCdcApplier.java   | 672 ++++++++++++++++++++
 .../postgresql/IgniteToPostgreSqlCdcConsumer.java  | 246 ++++++++
 .../cdc/kafka/CdcKafkaReplicationAppsTest.java     |   2 +-
 .../CdcPostgreSqlReplicationAbstractTest.java      | 178 ++++++
 .../postgres/CdcPostgreSqlReplicationAppsTest.java |  45 ++
 .../cdc/postgres/CdcPostgreSqlReplicationTest.java | 699 +++++++++++++++++++++
 .../resources/replication/ignite-to-postgres.xml   |  84 +++
 parent-internal/pom.xml                            |   2 +
 9 files changed, 1953 insertions(+), 1 deletion(-)

diff --git a/modules/cdc-ext/pom.xml b/modules/cdc-ext/pom.xml
index c8052fe2..014db80c 100644
--- a/modules/cdc-ext/pom.xml
+++ b/modules/cdc-ext/pom.xml
@@ -164,8 +164,34 @@
             <version>${slf4j.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>io.zonky.test</groupId>
+            <artifactId>embedded-postgres</artifactId>
+            <version>${embedded.postgres.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-dbcp2</artifactId>
+            <version>${commons.dbcp2.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>io.zonky.test.postgres</groupId>
+                <artifactId>embedded-postgres-binaries-bom</artifactId>
+                <version>14.5.0</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
     <build>
         <plugins>
             <plugin>
diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java
new file mode 100644
index 00000000..4399880f
--- /dev/null
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java
@@ -0,0 +1,672 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.postgresql;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import javax.sql.DataSource;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheEntryVersion;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cdc.CdcCacheEvent;
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.internal.util.typedef.F;
+
+import static 
org.apache.ignite.internal.processors.cache.GridCacheUtils.UNDEFINED_CACHE_ID;
+
+/** */
+public class IgniteToPostgreSqlCdcApplier {
+    /** */
+    public static final String DFLT_SQL_TYPE = "OTHER";
+
+    /** */
+    public static final Map<String, String> JAVA_TO_SQL_TYPES;
+
+    /** */
+    public static final Set<String> SQL_TYPES_WITH_PRECISION_ONLY;
+
+    /** */
+    public static final Set<String> SQL_TYPES_WITH_PRECISION_AND_SCALE;
+
+    static {
+        Map<String, String> javaToSqlTypes = new HashMap<>();
+
+        javaToSqlTypes.put("java.lang.String", "VARCHAR");
+        javaToSqlTypes.put("java.lang.Integer", "INT");
+        javaToSqlTypes.put("int", "INT");
+        javaToSqlTypes.put("java.lang.Long", "BIGINT");
+        javaToSqlTypes.put("long", "BIGINT");
+        javaToSqlTypes.put("java.lang.Boolean", "BOOLEAN");
+        javaToSqlTypes.put("boolean", "BOOLEAN");
+        javaToSqlTypes.put("java.lang.Double", "DOUBLE PRECISION");
+        javaToSqlTypes.put("double", "DOUBLE PRECISION");
+        javaToSqlTypes.put("java.lang.Float", "REAL");
+        javaToSqlTypes.put("float", "REAL");
+        javaToSqlTypes.put("java.math.BigDecimal", "DECIMAL");
+        javaToSqlTypes.put("java.lang.Short", "SMALLINT");
+        javaToSqlTypes.put("short", "SMALLINT");
+        javaToSqlTypes.put("java.lang.Byte", "SMALLINT");
+        javaToSqlTypes.put("byte", "SMALLINT");
+        javaToSqlTypes.put("java.util.UUID", "UUID");
+        javaToSqlTypes.put("[B", "BYTEA");
+        javaToSqlTypes.put("java.lang.Object", "OTHER");
+
+        JAVA_TO_SQL_TYPES = Collections.unmodifiableMap(javaToSqlTypes);
+
+        Set<String> sqlTypesWithPrecisionOnly = new HashSet<>();
+
+        sqlTypesWithPrecisionOnly.add("VARCHAR");
+        sqlTypesWithPrecisionOnly.add("DOUBLE PRECISION");
+
+        SQL_TYPES_WITH_PRECISION_ONLY = 
Collections.unmodifiableSet(sqlTypesWithPrecisionOnly);
+
+        Set<String> sqlTypesWithPrecisionAndScale = new HashSet<>();
+
+        sqlTypesWithPrecisionAndScale.add("DECIMAL");
+        sqlTypesWithPrecisionAndScale.add("REAL");
+
+        SQL_TYPES_WITH_PRECISION_AND_SCALE = 
Collections.unmodifiableSet(sqlTypesWithPrecisionAndScale);
+    }
+
+    /** */
+    private final DataSource dataSrc;
+
+    /** */
+    private final long batchSize;
+
+    /** */
+    private final IgniteLogger log;
+
+    /** */
+    private final Map<Integer, String> cacheIdToUpsertQry = new HashMap<>();
+
+    /** */
+    private final Map<Integer, String> cacheIdToDeleteQry = new HashMap<>();
+
+    /** */
+    private final Map<Integer, Set<String>> cacheIdToPrimaryKeys = new 
HashMap<>();
+
+    /** */
+    private final Map<Integer, Set<String>> cacheIdToFields = new HashMap<>();
+
+    /** */
+    private final Set<Object> curKeys = new HashSet<>();
+
+    /**
+     * @param dataSrc {@link DataSource} - connection pool to PostgreSql
+     * @param batchSize the number of CDC events to include in a single batch
+     * @param log the {@link IgniteLogger} instance used for logging CDC 
processing events
+     */
+    public IgniteToPostgreSqlCdcApplier(
+        DataSource dataSrc,
+        long batchSize,
+        IgniteLogger log
+    ) {
+        this.dataSrc = dataSrc;
+        this.batchSize = batchSize;
+        this.log = log;
+    }
+
+    /**
+     * @param evts an {@link Iterator} of {@link CdcEvent} objects to be 
applied
+     * @return the total number of events successfully batched and executed
+     */
+    public long applyEvents(Iterator<CdcEvent> evts) {
+        try (Connection conn = dataSrc.getConnection()) {
+            try {
+                // Setting it to true doesn't make each SQL query commit 
individually - it still commits the entire batch.
+                // We chose to handle commits manually for better control.
+                conn.setAutoCommit(false);
+
+                return applyEvents(conn, evts);
+            }
+            catch (SQLException e) {
+                conn.rollback();
+
+                throw e;
+            }
+        }
+        catch (Throwable e) {
+            log.error("Error during CDC event application: " + e.getMessage(), 
e);
+
+            throw new IgniteException("Failed to apply CDC events", e);
+        }
+    }
+
+    /**
+     * @param conn connection to PostgreSql
+     * @param evts an {@link Iterator} of {@link CdcEvent} objects to be 
applied
+     * @return the total number of events successfully batched and executed
+     */
+    private long applyEvents(Connection conn, Iterator<CdcEvent> evts) {
+        long evtsApplied = 0;
+
+        int currCacheId = UNDEFINED_CACHE_ID;
+        boolean prevOpIsDelete = false;
+        
+        PreparedStatement curPrepStmt = null;
+        CdcEvent evt;
+
+        while (evts.hasNext()) {
+            evt = evts.next();
+
+            if (log.isDebugEnabled())
+                log.debug("Event received [evt=" + evt + ']');
+
+            if (currCacheId != evt.cacheId() || prevOpIsDelete ^ (evt.value() 
== null)) {
+                if (curPrepStmt != null)
+                    evtsApplied += executeBatch(conn, curPrepStmt);
+
+                currCacheId = evt.cacheId();
+                prevOpIsDelete = evt.value() == null;
+
+                curPrepStmt = prepareStatement(conn, evt);
+            }
+
+            if (curKeys.size() >= batchSize || curKeys.contains(evt.key()))
+                evtsApplied += executeBatch(conn, curPrepStmt);
+
+            addEvent(curPrepStmt, evt);
+        }
+
+        if (!curKeys.isEmpty())
+            evtsApplied += executeBatch(conn, curPrepStmt);
+
+        return evtsApplied;
+    }
+
+    /**
+     * @param conn connection to PostgreSql
+     * @param curPrepStmt {@link PreparedStatement}
+     * @return the total number of batches successfully executed. One CdcEvent 
- one batch.
+     */
+    private int executeBatch(Connection conn, PreparedStatement curPrepStmt) {
+        try {
+            curKeys.clear();
+
+            if (log.isDebugEnabled())
+                log.debug("Applying batch " + curPrepStmt.toString());
+
+            if (!curPrepStmt.isClosed()) {
+                int batchSize = curPrepStmt.executeBatch().length;
+
+                // It's better to use autoCommit = false and call commit() 
manually for improved performance and
+                // clearer transaction boundaries
+                conn.commit();
+
+                return batchSize;
+            }
+
+            throw new IgniteException("Tried to execute on closed prepared 
statement!");
+        }
+        catch (SQLException e) {
+            log.error(e.getMessage(), e);
+
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param conn connection to PostgreSql
+     * @param evt {@link CdcEvent}
+     * @return relevant {@link PreparedStatement}
+     */
+    private PreparedStatement prepareStatement(Connection conn, CdcEvent evt) {
+        String sqlQry;
+
+        if (evt.value() == null)
+            sqlQry = cacheIdToDeleteQry.get(evt.cacheId());
+        else
+            sqlQry = cacheIdToUpsertQry.get(evt.cacheId());
+
+        if (sqlQry == null)
+            throw new IgniteException("No SQL query is found for cacheId=" + 
evt.cacheId());
+
+        if (log.isDebugEnabled())
+            log.debug("Statement updated [cacheId=" + evt.cacheId() + ", 
sqlQry=" + sqlQry + ']');
+
+        try {
+            return conn.prepareStatement(sqlQry);
+        }
+        catch (SQLException e) {
+            log.error(e.getMessage(), e);
+
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param curPrepStmt current {@link PreparedStatement}
+     * @param evt {@link CdcEvent}
+     */
+    private void addEvent(PreparedStatement curPrepStmt, CdcEvent evt) {
+        try {
+            Iterator<String> itFields = evt.value() == null ?
+                cacheIdToPrimaryKeys.get(evt.cacheId()).iterator() :
+                cacheIdToFields.get(evt.cacheId()).iterator();
+
+            String field;
+
+            BinaryObject keyObj = (evt.key() instanceof BinaryObject) ? 
(BinaryObject)evt.key() : null;
+            BinaryObject valObj = (evt.value() instanceof BinaryObject) ? 
(BinaryObject)evt.value() : null;
+
+            int idx = 1;
+            Object obj;
+
+            while (itFields.hasNext()) {
+                field = itFields.next();
+
+                if (cacheIdToPrimaryKeys.get(evt.cacheId()).contains(field))
+                    obj = keyObj != null ? keyObj.field(field) : evt.key();
+                else
+                    obj = valObj != null ? valObj.field(field) : evt.value();
+
+                addObject(curPrepStmt, idx, obj);
+
+                idx++;
+            }
+
+            if (evt.value() != null)
+                curPrepStmt.setBytes(idx, encodeVersion(evt.version()));
+
+            curKeys.add(evt.key());
+
+            curPrepStmt.addBatch();
+        }
+        catch (Throwable e) {
+            log.error(e.getMessage(), e);
+
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * Sets a value in the PreparedStatement at the given index using the 
appropriate setter
+     * based on the runtime type of the object.
+     * @param curPrepStmt {@link PreparedStatement}
+     * @param idx value index in {@link PreparedStatement}
+     * @param obj value
+     */
+    private void addObject(PreparedStatement curPrepStmt, int idx, Object obj) 
throws SQLException {
+        if (obj == null) {
+            curPrepStmt.setObject(idx, null);
+
+            return;
+        }
+
+        if (obj instanceof String)
+            curPrepStmt.setString(idx, (String)obj);
+        else if (obj instanceof Integer)
+            curPrepStmt.setInt(idx, (Integer)obj);
+        else if (obj instanceof Long)
+            curPrepStmt.setLong(idx, (Long)obj);
+        else if (obj instanceof Short)
+            curPrepStmt.setShort(idx, (Short)obj);
+        else if (obj instanceof Byte)
+            curPrepStmt.setByte(idx, (Byte)obj);
+        else if (obj instanceof Boolean)
+            curPrepStmt.setBoolean(idx, (Boolean)obj);
+        else if (obj instanceof Float)
+            curPrepStmt.setFloat(idx, (Float)obj);
+        else if (obj instanceof Double)
+            curPrepStmt.setDouble(idx, (Double)obj);
+        else if (obj instanceof BigDecimal)
+            curPrepStmt.setBigDecimal(idx, (BigDecimal)obj);
+        else if (obj instanceof UUID)
+            curPrepStmt.setObject(idx, obj, Types.OTHER); // PostgreSQL 
expects UUID as OTHER
+        else if (obj instanceof byte[])
+            curPrepStmt.setBytes(idx, (byte[])obj);
+        else
+            curPrepStmt.setObject(idx, obj);
+    }
+
+    /**
+     * @param evts an {@link Iterator} of {@link CdcCacheEvent} objects to 
apply
+     * @param createTables tables creation flag. If true - attempt to create 
tables will be made.
+     * @return Number of applied events.
+     */
+    public long applyCacheEvents(Iterator<CdcCacheEvent> evts, boolean 
createTables) {
+        CdcCacheEvent evt;
+        QueryEntity entity;
+
+        long cnt = 0;
+
+        while (evts.hasNext()) {
+            evt = evts.next();
+
+            if (evt.queryEntities().size() != 1)
+                throw new IgniteException("There should be exactly 1 
QueryEntity for cacheId: " + evt.cacheId());
+
+            entity = evt.queryEntities().iterator().next();
+
+            if (createTables)
+                createTableIfNotExists(entity);
+
+            cacheIdToUpsertQry.put(evt.cacheId(), getUpsertSqlQry(entity));
+
+            cacheIdToDeleteQry.put(evt.cacheId(), getDeleteSqlQry(entity));
+
+            cacheIdToPrimaryKeys.put(evt.cacheId(), getPrimaryKeys(entity));
+
+            cacheIdToFields.put(evt.cacheId(), entity.getFields().keySet());
+
+            if (createTables && log.isInfoEnabled())
+                log.info("Cache table created [tableName=" + 
entity.getTableName() +
+                    ", columns=" + entity.getFields().keySet() + ']');
+
+            cnt++;
+        }
+
+        return cnt;
+    }
+
+    /**
+     * @param entity the {@link QueryEntity} describing the table schema to 
create
+     */
+    private void createTableIfNotExists(QueryEntity entity) {
+        String createSqlStmt = getCreateTableSqlStatement(entity);
+
+        try (Connection conn = dataSrc.getConnection(); Statement stmt = 
conn.createStatement()) {
+            stmt.execute(createSqlStmt);
+        }
+        catch (SQLException e) {
+            log.error(e.getMessage(), e);
+
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * Generates the SQL statement for creating a table.
+     *
+     * @param entity QueryEntity instance describing the cache structure.
+     * @return SQL statement for creating a table.
+     */
+    private String getCreateTableSqlStatement(QueryEntity entity) {
+        StringBuilder ddl = new StringBuilder("CREATE TABLE IF NOT EXISTS 
").append(entity.getTableName()).append(" (");
+
+        addFieldsAndTypes(entity, ddl);
+
+        ddl.append(", version BYTEA NOT NULL");
+
+        ddl.append(", PRIMARY KEY (");
+
+        addPrimaryKeys(entity, ddl);
+
+        ddl.append(')').append(')');
+
+        return ddl.toString();
+    }
+
+    /**
+     * Constructs DDL-compatible SQL fragment listing fields along with their 
mapped SQL types.
+     *
+     * @param entity QueryEntity instance describing the cache structure.
+     * @param sql Target StringBuilder where the result will be appended.
+     */
+    private void addFieldsAndTypes(QueryEntity entity, StringBuilder sql) {
+        Iterator<Map.Entry<String, String>> iter = 
entity.getFields().entrySet().iterator();
+
+        Map.Entry<String, String> field;
+        String type;
+
+        Integer precision;
+        Integer scale;
+
+        while (iter.hasNext()) {
+            field = iter.next();
+            type = JAVA_TO_SQL_TYPES.getOrDefault(field.getValue(), 
DFLT_SQL_TYPE);
+
+            sql.append(field.getKey()).append(" ").append(type);
+
+            precision = entity.getFieldsPrecision().get(field.getKey());
+            scale = entity.getFieldsScale().get(field.getKey());
+
+            if (precision != null && precision > 0) {
+                if (SQL_TYPES_WITH_PRECISION_ONLY.contains(type))
+                    sql.append("(").append(precision).append(")");
+                else if (SQL_TYPES_WITH_PRECISION_AND_SCALE.contains(type)) {
+                    sql.append("(").append(precision);
+
+                    if (scale != null && scale >= 0)
+                        sql.append(", ").append(scale);
+
+                    sql.append(")");
+                }
+            }
+
+            if (iter.hasNext())
+                sql.append(", ");
+        }
+    }
+
+    /**
+     * Generates a parameterized SQL UPSERT (INSERT ... ON CONFLICT DO UPDATE) 
query
+     * for the given {@link QueryEntity}, including a version-based conflict 
resolution condition.
+     * <pre>{@code
+     * INSERT INTO my_table (id, name, version) VALUES (?, ?, ?)
+     * ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name
+     * WHERE version < EXCLUDED.version
+     * }</pre>
+     *
+     * Notes:
+     * <ul>
+     *   <li>The {@code version} field is added to support version-based 
upsert logic.</li>
+     *   <li>Primary key fields are excluded from the {@code DO UPDATE SET} 
clause.</li>
+     *   <li>All fields are assigned {@code ?} placeholders for use with 
{@link PreparedStatement}.</li>
+     * </ul>
+     *
+     * @param entity the {@link QueryEntity} describing the table, fields, and 
primary keys
+     * @return a SQL UPSERT query string with parameter placeholders and 
version conflict resolution
+     */
+    private String getUpsertSqlQry(QueryEntity entity) {
+        StringBuilder sql = new StringBuilder("INSERT INTO 
").append(entity.getTableName()).append(" (");
+
+        addFields(entity, sql);
+
+        sql.append(", version) VALUES (");
+
+        for (int i = 0; i < entity.getFields().size() + 1; ++i) { // version 
field included
+            sql.append('?');
+
+            if (i < entity.getFields().size())
+                sql.append(", ");
+        }
+
+        sql.append(") ON CONFLICT (");
+
+        addPrimaryKeys(entity, sql);
+
+        sql.append(") DO UPDATE SET ");
+
+        addUpdateFields(entity, sql);
+
+        sql.append(" WHERE ").append(entity.getTableName()).append(".version < 
EXCLUDED.version");
+
+        return sql.toString();
+    }
+
+    /**
+     * Builds a comma-separated list of field names extracted from the 
QueryEntity.
+     *
+     * @param entity QueryEntity instance describing the cache structure.
+     * @param sql Target StringBuilder where the result will be appended.
+     */
+    private void addFields(QueryEntity entity, StringBuilder sql) {
+        Iterator<Map.Entry<String, String>> iter = 
entity.getFields().entrySet().iterator();
+        Map.Entry<String, String> field;
+
+        while (iter.hasNext()) {
+            field = iter.next();
+
+            sql.append(field.getKey());
+
+            if (iter.hasNext())
+                sql.append(", ");
+        }
+    }
+
+    /**
+     * Builds a SQL update clause excluding primary key fields, including 
version-specific fields.
+     *
+     * @param entity QueryEntity instance describing the cache structure.
+     * @param sql Target StringBuilder where the resulting SQL fragment will 
be appended.
+     */
+    private void addUpdateFields(QueryEntity entity, StringBuilder sql) {
+        Set<String> primaryFields = getPrimaryKeys(entity);
+
+        Iterator<String> itAllFields = F.concat(false, "version", 
entity.getFields().keySet()).iterator();
+
+        String field;
+
+        boolean first = true;
+
+        while (itAllFields.hasNext()) {
+            field = itAllFields.next();
+
+            if (primaryFields.contains(field))
+                continue;
+
+            if (!first)
+                sql.append(", ");
+
+            sql.append(field).append(" = EXCLUDED.").append(field);
+
+            if (first)
+                first = false;
+        }
+    }
+
+    /**
+     * Generates a parameterized SQL DELETE query for the given {@link 
QueryEntity}.
+     * Example:
+     * <pre>{@code
+     * // For a key: id
+     * DELETE FROM my_table WHERE (id = ?)
+     * }</pre>
+     *
+     * If the table has a composite primary key, all keys will be included 
with AND conditions:
+     * <pre>{@code
+     * // For a composite key: id1, id2
+     * DELETE FROM my_table WHERE (id1 = ? AND id2 = ?)
+     * }</pre>
+     *
+     * @param entity the {@link QueryEntity} describing the table and its 
primary keys
+     * @return a SQL DELETE query string with parameter placeholders for 
primary key values
+     */
+    private String getDeleteSqlQry(QueryEntity entity) {
+        StringBuilder deleteQry = new StringBuilder("DELETE FROM 
").append(entity.getTableName()).append(" WHERE (");
+
+        Iterator<String> itKeys = getPrimaryKeys(entity).iterator();
+        String key;
+
+        while (itKeys.hasNext()) {
+            key = itKeys.next();
+
+            deleteQry.append(key).append(" = ?");
+
+            if (itKeys.hasNext())
+                deleteQry.append(" AND ");
+        }
+
+        deleteQry.append(')');
+
+        return deleteQry.toString();
+    }
+
+    /**
+     * Generates a SQL fragment listing primary key fields for the given 
QueryEntity.
+     *
+     * @param entity QueryEntity instance describing the cache structure.
+     * @param sql Target StringBuilder where the resulting SQL fragment will 
be appended.
+     */
+    private void addPrimaryKeys(QueryEntity entity, StringBuilder sql) {
+        Iterator<String> iterKeys = getPrimaryKeys(entity).iterator();
+
+        while (iterKeys.hasNext()) {
+            sql.append(iterKeys.next());
+
+            if (iterKeys.hasNext())
+                sql.append(", ");
+        }
+    }
+
+    /**
+     * Retrieves the primary key field names from the provided {@link 
QueryEntity}.
+     * If no primary keys are defined, it returns a set containing the first 
field from the table.
+     *
+     * @param entity The {@link QueryEntity} representing the table and its 
metadata.
+     * @return A set of primary key field names or a set containing the first 
field if no primary keys are defined.
+     */
+    private Set<String> getPrimaryKeys(QueryEntity entity) {
+        Set<String> keys = entity.getKeyFields();
+
+        if (keys == null || keys.isEmpty()) {
+            if (entity.getKeyFieldName() == null)
+                throw new IgniteException("Couldn't determine key field for 
queryEntity [tableName=" +
+                    entity.getTableName() + ']');
+
+            return Collections.singleton(entity.getKeyFieldName());
+        }
+
+        return keys;
+    }
+
+    /**
+     * Encodes the components of a {@link CacheEntryVersion} into a 16-byte 
array
+     * using big-endian byte order for compact and lexicographically 
comparable storage.
+     * <p>
+     * The encoding format is:
+     * <ul>
+     *   <li>4 bytes — {@code topologyVersion} (int)</li>
+     *   <li>8 bytes — {@code order} (long)</li>
+     *   <li>4 bytes — {@code nodeOrder} (int)</li>
+     * </ul>
+     * This format ensures that the resulting {@code byte[]} can be compared
+     * lexicographically to determine version ordering (i.e., a larger byte 
array
+     * represents a newer version), which is compatible with PostgreSQL {@code 
BYTEA}
+     * comparison semantics.
+     *
+     * @param ver the {@link CacheEntryVersion} instance to encode
+     * @return a 16-byte array representing the version in big-endian format
+     */
+    private byte[] encodeVersion(CacheEntryVersion ver) {
+        ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.BIG_ENDIAN);
+
+        buf.putInt(ver.topologyVersion());
+        buf.putLong(ver.order());
+        buf.putInt(ver.nodeOrder());
+
+        return buf.array();
+    }
+}
diff --git 
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcConsumer.java
 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcConsumer.java
new file mode 100644
index 00000000..c24fee44
--- /dev/null
+++ 
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcConsumer.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.postgresql;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.sql.DataSource;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cdc.CdcCacheEvent;
+import org.apache.ignite.cdc.CdcConsumer;
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.cdc.TypeMapping;
+import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
+import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.metric.MetricRegistry;
+import org.apache.ignite.resources.LoggerResource;
+
+/**
+ * This class represents a consumer component that replicates cache changes 
from Apache Ignite to PostgreSQL using
+ * Change Data Capture (CDC) mechanism. It applies events to PostgreSQL via 
batch-prepared SQL statements, ensuring
+ * efficient handling of large volumes of updates.
+ *
+ * <p>Additionally, it provides methods for initializing connections, managing 
transactions, and performing atomic batches
+ * of writes.</p>
+ */
+public class IgniteToPostgreSqlCdcConsumer implements CdcConsumer {
+    /** */
+    public static final String EVTS_SENT_CNT = "EventsCount";
+
+    /** */
+    public static final String EVTS_SENT_CNT_DESC = "Count of events applied 
to PostgreSQL";
+
+    /** */
+    public static final String LAST_EVT_SENT_TIME = "LastEventTime";
+
+    /** */
+    public static final String LAST_EVT_SENT_TIME_DESC = "Timestamp of last 
applied event to PostgreSQL";
+
+    /** */
+    private static final boolean DFLT_IS_ONLY_PRIMARY = true;
+
+    /** */
+    private static final long DFLT_BATCH_SIZE = 1024;
+
+    /** */
+    private static final boolean DFLT_CREATE_TABLES = false;
+
+    /** */
+    private DataSource dataSrc;
+
+    /** Collection of cache names which will be replicated to PostgreSQL. */
+    private Collection<String> caches;
+
+    /** */
+    private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY;
+
+    /** */
+    private long batchSize = DFLT_BATCH_SIZE;
+
+    /** */
+    private boolean createTables = DFLT_CREATE_TABLES;
+
+    /** Log. */
+    @LoggerResource
+    private IgniteLogger log;
+
+    /** Cache IDs. */
+    private Set<Integer> cachesIds;
+
+    /** Applier instance responsible for applying individual CDC events to 
PostgreSQL. */
+    private IgniteToPostgreSqlCdcApplier applier;
+
+    /** Count of events applied to PostgreSQL. */
+    private AtomicLongMetric evtsCnt;
+
+    /** Timestamp of last applied batch to PostgreSQL. */
+    private AtomicLongMetric lastEvtTs;
+
+    /** {@inheritDoc} */
+    @Override public void start(MetricRegistry reg) {
+        A.notNull(dataSrc, "dataSource");
+        A.notEmpty(caches, "caches");
+        A.ensure(batchSize > 0, "batchSize");
+
+        cachesIds = caches.stream()
+            .map(CU::cacheId)
+            .collect(Collectors.toSet());
+
+        applier = new IgniteToPostgreSqlCdcApplier(dataSrc, batchSize, log);
+
+        MetricRegistryImpl mreg = (MetricRegistryImpl)reg;
+
+        this.evtsCnt = mreg.longMetric(EVTS_SENT_CNT, EVTS_SENT_CNT_DESC);
+        this.lastEvtTs = mreg.longMetric(LAST_EVT_SENT_TIME, 
LAST_EVT_SENT_TIME_DESC);
+
+        if (log.isInfoEnabled())
+            log.info("CDC Ignite to PostgreSQL start-up [cacheIds=" + 
cachesIds + ']');
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onEvents(Iterator<CdcEvent> events) {
+        Iterator<CdcEvent> filtered = F.iterator(
+            events,
+            F.identity(),
+            true,
+            evt -> !onlyPrimary || evt.primary(),
+            evt -> cachesIds.contains(evt.cacheId()));
+
+        long evtsSent = applier.applyEvents(filtered);
+
+        if (evtsSent > 0) {
+            evtsCnt.add(evtsSent);
+            lastEvtTs.value(System.currentTimeMillis());
+
+            if (log.isInfoEnabled())
+                log.info("Events applied [evtsApplied=" + evtsCnt.value() + 
']');
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onTypes(Iterator<BinaryType> types) {
+        types.forEachRemaining(e -> {
+            // Just skip. Handle of cache events not supported.
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMappings(Iterator<TypeMapping> mappings) {
+        mappings.forEachRemaining(e -> {
+            // Just skip. Handle of cache events not supported.
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents) {
+        Iterator<CdcCacheEvent> filtered = F.iterator(
+            cacheEvents,
+            F.identity(),
+            true,
+            evt -> cachesIds.contains(evt.cacheId()));
+
+        long tablesCreated = applier.applyCacheEvents(filtered, createTables);
+
+        if (createTables && tablesCreated > 0 && log.isInfoEnabled())
+            log.info("Cache changes applied [tablesCreatedCnt=" + 
tablesCreated + ']');
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onCacheDestroy(Iterator<Integer> caches) {
+        caches.forEachRemaining(e -> {
+            // Just skip. Handle of cache events not supported.
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+
+    }
+
+    /**
+     * Sets the datasource configuration for connecting to the PostgreSQL 
database.
+     *
+     * @param dataSrc Configured data source.
+     * @return {@code this} for chaining.
+     */
+    public IgniteToPostgreSqlCdcConsumer setDataSource(DataSource dataSrc) {
+        this.dataSrc = dataSrc;
+
+        return this;
+    }
+
+    /**
+     * Sets cache names to replicate.
+     *
+     * @param caches Cache names.
+     * @return {@code this} for chaining.
+     */
+    public IgniteToPostgreSqlCdcConsumer setCaches(Set<String> caches) {
+        this.caches = Collections.unmodifiableSet(caches);
+
+        return this;
+    }
+
+    /**
+     * Enables/disables filtering to accept only primary-node originated 
events.
+     *
+     * @param onlyPrimary True to restrict replication to primary events only.
+     * @return {@code this} for chaining.
+     */
+    public IgniteToPostgreSqlCdcConsumer setOnlyPrimary(boolean onlyPrimary) {
+        this.onlyPrimary = onlyPrimary;
+
+        return this;
+    }
+
+    /**
+     * Sets the batch size that will be submitted to PostgreSQL.
+     * <p>
+     * This setting controls how many statements are sent in a single {@link 
java.sql.PreparedStatement#executeBatch()} call.
+     * <p>
+     *
+     * @param batchSize number of statements per batch.
+     * @return {@code this} for chaining.
+     */
+    public IgniteToPostgreSqlCdcConsumer setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+
+        return this;
+    }
+
+    /**
+     * Enables/disables creation of tables by this instance.
+     *
+     * @param createTables True to create tables on start-up
+     * @return {@code this} for chaining.
+     */
+    public IgniteToPostgreSqlCdcConsumer setCreateTables(boolean createTables) 
{
+        this.createTables = createTables;
+
+        return this;
+    }
+}
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
index 927a7b25..ef4e1400 100644
--- 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
@@ -191,7 +191,7 @@ public class CdcKafkaReplicationAppsTest extends 
CdcKafkaReplicationTest {
     }
 
     /** */
-    private String prepareConfig(String path, Map<String, String> params) {
+    public static String prepareConfig(String path, Map<String, String> 
params) {
         try {
             String cfg = new 
String(Files.readAllBytes(Paths.get(CdcKafkaReplicationAppsTest.class.getResource(path).toURI())));
 
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAbstractTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAbstractTest.java
new file mode 100644
index 00000000..57da44b5
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAbstractTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.postgres;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Set;
+import javax.sql.DataSource;
+import io.zonky.test.db.postgres.embedded.EmbeddedPostgres;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cdc.CdcConfiguration;
+import org.apache.ignite.cdc.postgresql.IgniteToPostgreSqlCdcConsumer;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** */
+public class CdcPostgreSqlReplicationAbstractTest extends 
GridCommonAbstractTest {
+    /** */
+    protected static final int BATCH_SIZE = 128;
+
+    /** */
+    protected static final int KEYS_CNT = 1024;
+
+    /** */
+    protected void executeOnIgnite(IgniteEx src, String sqlText, Object... 
args) {
+        SqlFieldsQuery qry = new SqlFieldsQuery(sqlText).setArgs(args);
+
+        try (FieldsQueryCursor<List<?>> cursor = 
src.context().query().querySqlFields(qry, true)) {
+            cursor.getAll();
+        }
+    }
+
+    /** */
+    protected ResultSet selectOnPostgreSql(EmbeddedPostgres postgres, String 
qry) {
+        try (Connection conn = postgres.getPostgresDatabase().getConnection()) 
{
+            PreparedStatement stmt = conn.prepareStatement(qry);
+
+            return stmt.executeQuery();
+        }
+        catch (SQLException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** */
+    protected void executeOnPostgreSql(EmbeddedPostgres postgres, String qry) {
+        try (Connection conn = postgres.getPostgresDatabase().getConnection()) 
{
+            PreparedStatement stmt = conn.prepareStatement(qry);
+
+            stmt.executeUpdate();
+        }
+        catch (SQLException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** */
+    protected boolean checkRow(
+        EmbeddedPostgres postgres,
+        String tableName,
+        String columnName,
+        String expected,
+        String condition
+    ) {
+        String qry = "SELECT " + columnName + " FROM " + tableName + " WHERE " 
+ condition;
+
+        try (ResultSet res = selectOnPostgreSql(postgres, qry)) {
+            if (res.next()) {
+                String actual = res.getString(columnName);
+
+                return expected.equals(actual);
+            }
+
+            return false;
+        }
+        catch (Exception e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** */
+    protected GridAbsPredicate waitForTablesCreatedOnPostgres(EmbeddedPostgres 
postgres, Set<String> caches) {
+        return () -> {
+            String sql = "SELECT EXISTS (" +
+                "  SELECT 1 FROM information_schema.tables " +
+                "  WHERE table_name = '%s'" +
+                ")";
+
+            for (String cache : caches) {
+                try (ResultSet rs = selectOnPostgreSql(postgres, 
String.format(sql, cache.toLowerCase()))) {
+                    rs.next();
+
+                    if (!rs.getBoolean(1))
+                        return false;
+                }
+                catch (SQLException e) {
+                    log.error(e.getMessage(), e);
+
+                    throw new IgniteException(e);
+                }
+            }
+
+            return true;
+        };
+    }
+
+    /** */
+    protected GridAbsPredicate waitForTableSize(EmbeddedPostgres postgres, 
String tableName, long expSz) {
+        return () -> {
+            try (ResultSet res = selectOnPostgreSql(postgres, "SELECT COUNT(*) 
FROM " + tableName)) {
+                res.next();
+
+                long cnt = res.getLong(1);
+
+                return cnt == expSz;
+            }
+            catch (SQLException e) {
+                throw new IgniteException(e);
+            }
+        };
+    }
+
+    /** */
+    protected IgniteToPostgreSqlCdcConsumer getCdcConsumerConfiguration() {
+        return new IgniteToPostgreSqlCdcConsumer()
+            .setBatchSize(BATCH_SIZE)
+            .setOnlyPrimary(true)
+            .setCreateTables(false);
+    }
+
+    /**
+     * @param igniteCfg Ignite configuration.
+     * @param caches Cache name set to stream to PostgreSql.
+     * @param dataSrc Data Source.
+     * @return Future for Change Data Capture application.
+     */
+    protected IgniteInternalFuture<?> startIgniteToPostgreSqlCdcConsumer(
+        IgniteConfiguration igniteCfg,
+        Set<String> caches,
+        DataSource dataSrc
+    ) {
+        IgniteToPostgreSqlCdcConsumer cdcCnsmr = getCdcConsumerConfiguration()
+            .setCaches(caches)
+            .setDataSource(dataSrc);
+
+        CdcConfiguration cdcCfg = new CdcConfiguration();
+
+        cdcCfg.setConsumer(cdcCnsmr);
+
+        return runAsync(new CdcMain(igniteCfg, null, cdcCfg), 
"ignite-src-to-postgres-" + igniteCfg.getConsistentId());
+    }
+}
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAppsTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAppsTest.java
new file mode 100644
index 00000000..d7c39555
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAppsTest.java
@@ -0,0 +1,45 @@
+package org.apache.ignite.cdc.postgres;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.sql.DataSource;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.startup.cmdline.CdcCommandLineStartup;
+
+import static 
org.apache.ignite.cdc.kafka.CdcKafkaReplicationAppsTest.prepareConfig;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** PostgreSql CDC test with .xml configuration. */
+public class CdcPostgreSqlReplicationAppsTest extends 
CdcPostgreSqlReplicationTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteInternalFuture<?> 
startIgniteToPostgreSqlCdcConsumer(
+        IgniteConfiguration igniteCfg,
+        Set<String> caches,
+        DataSource dataSrc
+    ) {
+        String cfgPath = "/replication/ignite-to-postgres.xml";
+        String threadName = "ignite-src-to-postgres-xml";
+
+        Map<String, String> params = new HashMap<>();
+
+        params.put("INSTANCE_NAME", igniteCfg.getIgniteInstanceName());
+        params.put("CONSISTENT_ID", 
String.valueOf(igniteCfg.getConsistentId()));
+
+        params.put("BATCH_SIZE", Integer.toString(BATCH_SIZE));
+        params.put("ONLY_PRIMARY", "true");
+        params.put("CREATE_TABLE", String.valueOf(createTables));
+
+        params.put("DB_URL", postgres.getJdbcUrl("postgres", "postgres"));
+        params.put("DB_USER", "postgres");
+        params.put("DB_PASSWORD", "");
+
+        return runAsync(() -> CdcCommandLineStartup.main(new String[] 
{prepareConfig(cfgPath, params)}), threadName);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void checkFutureEndWithError(IgniteInternalFuture<?> 
fut) {
+        // CdcMain error in CdcCommandLineStartup leads to 'System.exit(-1)' 
without showing error/cancellation in fut
+    }
+}
diff --git 
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationTest.java
 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationTest.java
new file mode 100644
index 00000000..468ff4df
--- /dev/null
+++ 
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationTest.java
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.postgres;
+
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.IntConsumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import io.zonky.test.db.postgres.embedded.EmbeddedPostgres;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cdc.postgresql.IgniteToPostgreSqlCdcConsumer;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+@RunWith(Parameterized.class)
+public class CdcPostgreSqlReplicationTest extends 
CdcPostgreSqlReplicationAbstractTest {
+    /** */
+    private static final int BACKUP = 0;
+
+    /** */
+    private static final String CACHE_MODE = "PARTITIONED";
+
+    /** */
+    @Parameterized.Parameter()
+    public CacheAtomicityMode atomicity;
+
+    /** */
+    @Parameterized.Parameter(1)
+    public boolean createTables;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "atomicity={0}, createTables={1}")
+    public static Collection<?> parameters() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (CacheAtomicityMode atomicity : EnumSet.of(ATOMIC, TRANSACTIONAL)) 
{
+            for (boolean createTables : new boolean[] {true, false})
+                params.add(new Object[] {atomicity, createTables});
+        }
+
+        return params;
+    }
+
+    /** */
+    protected IgniteEx src;
+
+    /** */
+    protected EmbeddedPostgres postgres;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        DataRegionConfiguration dataRegionConfiguration = new 
DataRegionConfiguration()
+            .setPersistenceEnabled(true)
+            .setCdcEnabled(true);
+
+        DataStorageConfiguration dataStorageConfiguration = new 
DataStorageConfiguration()
+            .setWalForceArchiveTimeout(5_000)
+            .setDefaultDataRegionConfiguration(dataRegionConfiguration);
+
+        cfg.setDataStorageConfiguration(dataStorageConfiguration);
+        cfg.setConsistentId(igniteInstanceName);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteToPostgreSqlCdcConsumer 
getCdcConsumerConfiguration() {
+        IgniteToPostgreSqlCdcConsumer cdcCfg = 
super.getCdcConsumerConfiguration();
+
+        cdcCfg.setCreateTables(createTables);
+
+        return cdcCfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        cleanPersistenceDir();
+
+        src = startGrid(0);
+
+        src.cluster().state(ClusterState.ACTIVE);
+
+        postgres = EmbeddedPostgres.builder().start();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        postgres.close();
+    }
+
+    /** */
+    @Test
+    public void testSingleColumnKeyDataReplicationWithPrimaryFirst() throws 
Exception {
+        testSingleColumnKeyDataReplication(false);
+    }
+
+    /** */
+    @Test
+    public void testSingleColumnKeyDataReplicationWithPrimaryLast() throws 
Exception {
+        testSingleColumnKeyDataReplication(true);
+    }
+
+    /** */
+    public void testSingleColumnKeyDataReplication(boolean isPrimaryLast) 
throws Exception {
+        String[] tableFields;
+
+        String insertQry = "INSERT INTO T1 VALUES(?, ?)";
+        String updateQry;
+
+        IntConsumer insert;
+        IntConsumer update;
+
+        if (isPrimaryLast) {
+            tableFields = new String[] {"NAME VARCHAR(20)", "ID BIGINT PRIMARY 
KEY"};
+
+            updateQry = "MERGE INTO T1 (NAME, ID) VALUES (?, ?)";
+
+            insert = id -> executeOnIgnite(src, insertQry, "Name" + id, id);
+            update = id -> executeOnIgnite(src, updateQry, id + "Name", id);
+        }
+        else {
+            tableFields = new String[] {"ID BIGINT PRIMARY KEY", "NAME 
VARCHAR(20)"};
+
+            updateQry = "MERGE INTO T1 (ID, NAME) VALUES (?, ?)";
+
+            insert = id -> executeOnIgnite(src, insertQry, id, "Name" + id);
+            update = id -> executeOnIgnite(src, updateQry, id, id + "Name");
+        }
+
+        createTable("T1", tableFields, null, null, null);
+
+        Supplier<Boolean> checkInsert = () -> checkSingleColumnKeyTable(id -> 
"Name" + id);
+
+        Supplier<Boolean> checkUpdate = () -> checkSingleColumnKeyTable(id -> 
id + "Name");
+
+        testDataReplication("T1", insert, checkInsert, update, checkUpdate);
+    }
+
+    /** */
+    private boolean checkSingleColumnKeyTable(Function<Long, String> idToName) 
{
+        String qry = "SELECT ID, NAME FROM T1 ORDER BY ID";
+
+        try (ResultSet res = selectOnPostgreSql(postgres, qry)) {
+            long cnt = 0;
+
+            long id;
+            String curName;
+
+            while (res.next()) {
+                id = res.getLong("ID");
+                curName = res.getString("NAME");
+
+                if (!idToName.apply(id).equals(curName) || cnt != id)
+                    return false;
+
+                cnt++;
+            }
+
+            return cnt == KEYS_CNT;
+        }
+        catch (Exception e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** Replication with complex SQL key. Data inserted via SQL. */
+    @Test
+    public void testMultiColumnKeyDataReplicationWithSql() throws Exception {
+        IntConsumer insert = id -> executeOnIgnite(
+            src,
+            "INSERT INTO T2 (ID, SUBID, NAME, VAL) VALUES(?, ?, ?, ?)",
+            id,
+            "SUBID",
+            "Name" + id,
+            id * 42
+        );
+
+        IntConsumer update = id -> executeOnIgnite(
+            src,
+            "MERGE INTO T2 (ID, SUBID, NAME, VAL) VALUES(?, ?, ?, ?)",
+            id,
+            "SUBID",
+            id + "Name",
+            id + 42
+        );
+
+        testMultiColumnKeyDataReplication("T2", insert, update);
+    }
+
+    /** Replication with complex SQL key. Data inserted via key-value API. */
+    @Test
+    public void testMultiColumnKeyDataReplicationWithKeyValue() throws 
Exception {
+        IntConsumer insert = id -> src.cache("T3")
+            .put(
+                new TestKey(id, "SUBID"),
+                new TestVal("Name" + id, id * 42)
+            );
+
+        IntConsumer update = id -> src.cache("T3")
+            .put(
+                new TestKey(id, "SUBID"),
+                new TestVal(id + "Name", id + 42)
+            );
+
+        testMultiColumnKeyDataReplication("T3", insert, update);
+    }
+
+    /** */
+    public void testMultiColumnKeyDataReplication(String tableName, 
IntConsumer insert, IntConsumer update) throws Exception {
+        String[] tableFields = new String[] {
+            "ID INT NOT NULL",
+            "SUBID VARCHAR(15) NOT NULL",
+            "NAME VARCHAR",
+            "VAL INT"
+        };
+
+        String constraint = "PRIMARY KEY (ID, SUBID)";
+
+        createTable(tableName, tableFields, constraint, 
TestKey.class.getName(), TestVal.class.getName());
+
+        Supplier<Boolean> checkInsert = () -> 
checkMultiColumnKeyTable(tableName, id -> "Name" + id, id -> id * 42);
+
+        Supplier<Boolean> checkUpdate = () -> 
checkMultiColumnKeyTable(tableName, id -> id + "Name", id -> id + 42);
+
+        testDataReplication(tableName, insert, checkInsert, update, 
checkUpdate);
+    }
+
+    /** */
+    private boolean checkMultiColumnKeyTable(
+        String tableName,
+        Function<Integer, String> idToName,
+        Function<Integer, Integer> idToVal
+    ) {
+        String qry = "SELECT ID, NAME, VAL FROM " + tableName + " ORDER BY ID";
+
+        try (ResultSet res = selectOnPostgreSql(postgres, qry)) {
+            long cnt = 0;
+
+            int id;
+            String curName;
+            int curVal;
+
+            while (res.next()) {
+                id = res.getInt("ID");
+                curName = res.getString("NAME");
+                curVal = res.getInt("VAL");
+
+                if (!idToVal.apply(id).equals(curVal) || 
!idToName.apply(id).equals(curName) || cnt != id)
+                    return false;
+
+                cnt++;
+            }
+
+            return cnt == KEYS_CNT;
+        }
+        catch (Exception e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** */
+    private void testDataReplication(
+        String tableName,
+        IntConsumer insert,
+        Supplier<Boolean> checkInsert,
+        IntConsumer update,
+        Supplier<Boolean> checkUpdate
+    ) throws Exception {
+        IgniteInternalFuture<?> fut = 
startCdc(Stream.of(tableName).collect(Collectors.toSet()));
+
+        try {
+            IntStream.range(0, KEYS_CNT).forEach(insert);
+
+            assertTrue(waitForCondition(waitForTableSize(postgres, tableName, 
KEYS_CNT), getTestTimeout()));
+
+            assertTrue(checkInsert.get());
+
+            executeOnIgnite(src, "DELETE FROM " + tableName);
+
+            assertTrue(waitForCondition(waitForTableSize(postgres, tableName, 
0), getTestTimeout()));
+
+            IntStream.range(0, KEYS_CNT).forEach(insert);
+
+            assertTrue(waitForCondition(waitForTableSize(postgres, tableName, 
KEYS_CNT), getTestTimeout()));
+
+            rangeWithDuplicates(0, KEYS_CNT).forEach(update);
+
+            assertTrue(waitForCondition(checkUpdate::get, getTestTimeout()));
+        }
+        finally {
+            fut.cancel();
+        }
+    }
+
+    /**
+     * @param startInclusive Start inclusive.
+     * @param endExclusive End exclusive.
+     */
+    private IntStream rangeWithDuplicates(int startInclusive, int 
endExclusive) {
+        List<Integer> duplicatedKeys = IntStream.concat(
+                IntStream.range(startInclusive, endExclusive),
+                IntStream.range(startInclusive, endExclusive)
+            )
+            .boxed()
+            .collect(Collectors.toList());
+
+        Collections.shuffle(duplicatedKeys);
+
+        return duplicatedKeys.stream().mapToInt(Integer::intValue);
+    }
+
+    /** */
+    @Test
+    public void testMultipleTableDataReplication() throws Exception {
+        String[] tableFields = new String[] {"ID BIGINT PRIMARY KEY", "NAME 
VARCHAR"};
+
+        createTable("T4", tableFields, null, null, null);
+        createTable("T5", tableFields, null, null, null);
+        createTable("T6", tableFields, null, null, null);
+
+        IgniteInternalFuture<?> fut = startCdc(Stream.of("T4", "T5", 
"T6").collect(Collectors.toSet()));
+
+        try {
+            String insertQry = "INSERT INTO %s VALUES(?, ?)";
+            String updateQry = "MERGE INTO %s (ID, NAME) VALUES (?, ?)";
+
+            executeOnIgnite(src, String.format(insertQry, "T4"), 1, "Name" + 
1);
+
+            assertTrue(waitForCondition(waitForTableSize(postgres, "T4", 1), 
getTestTimeout()));
+
+            executeOnIgnite(src, String.format(updateQry, "T4"), 1, "Name" + 
2);
+            executeOnIgnite(src, String.format(insertQry, "T4"), 3, "Name" + 
1);
+            executeOnIgnite(src, String.format(insertQry, "T5"), 4, "Name" + 
1);
+            executeOnIgnite(src, String.format(insertQry, "T6"), 5, "Name" + 
5);
+            executeOnIgnite(src, String.format(insertQry, "T6"), 6, "Name" + 
6);
+            executeOnIgnite(src, String.format(updateQry, "T6"), 5, 5 + 
"Name");
+
+            assertTrue(waitForCondition(waitForTableSize(postgres, "T4", 2), 
getTestTimeout()));
+            assertTrue(waitForCondition(waitForTableSize(postgres, "T5", 1), 
getTestTimeout()));
+            assertTrue(waitForCondition(waitForTableSize(postgres, "T6", 2), 
getTestTimeout()));
+
+            assertTrue(checkRow(postgres, "T4", "NAME", "Name" + 2, "ID=1"));
+            assertTrue(checkRow(postgres, "T4", "NAME", "Name" + 1, "ID=3"));
+            assertTrue(checkRow(postgres, "T5", "NAME", "Name" + 1, "ID=4"));
+            assertTrue(checkRow(postgres, "T6", "NAME", 5 + "Name", "ID=5"));
+            assertTrue(checkRow(postgres, "T6", "NAME", "Name" + 6, "ID=6"));
+        }
+        finally {
+            fut.cancel();
+        }
+    }
+
+    /** */
+    private IgniteInternalFuture<?> startCdc(Set<String> caches) throws 
IgniteInterruptedCheckedException {
+        IgniteInternalFuture<?> fut = 
startIgniteToPostgreSqlCdcConsumer(src.configuration(), caches, 
postgres.getPostgresDatabase());
+
+        assertTrue(waitForCondition(waitForTablesCreatedOnPostgres(postgres, 
caches), getTestTimeout()));
+
+        return fut;
+    }
+
+    /** */
+    private void createTable(String tableName, String[] fields, String 
constraint, String keyClsName, String valClsName) {
+        StringBuilder fieldsBldr = new StringBuilder();
+
+        A.notEmpty(fields, "Empty fields declaration.");
+
+        for (int i = 0; i < fields.length; ++i) {
+            fieldsBldr.append(fields[i]);
+
+            if (i < fields.length - 1)
+                fieldsBldr.append(",");
+        }
+
+        String constraintQry = constraint == null ? "" : ", " + constraint;
+
+        String createQry = "CREATE TABLE IF NOT EXISTS " + tableName +
+            " (" + fieldsBldr + constraintQry + ")";
+
+        String createQryWithArgs = createQry +
+            "    WITH \"CACHE_NAME=" + tableName + "," +
+            (keyClsName == null ? "" : "KEY_TYPE=" + keyClsName + ",") +
+            (valClsName == null ? "" : "VALUE_TYPE=" + valClsName + ",") +
+            "ATOMICITY=" + atomicity.name() + "," +
+            "BACKUPS=" + BACKUP + "," +
+            "TEMPLATE=" + CACHE_MODE + "\";";
+
+        executeOnIgnite(src, createQryWithArgs);
+
+        if (!createTables)
+            executeOnPostgreSql(postgres, "CREATE TABLE IF NOT EXISTS " + 
tableName +
+                " (" + fieldsBldr + ", version BYTEA NOT NULL" + constraintQry 
+ ")");
+    }
+
+    /** */
+    @Test
+    public void testQueryEntityError() throws IgniteCheckedException {
+        QueryEntity qryOnlyValType = new QueryEntity()
+            .setTableName("qryOnlyValType")
+            .setValueType("org.apache.ignite.cdc.postgres.TestVal");
+
+        testQueryEntityReplicationError(qryOnlyValType);
+
+        QueryEntity qryOnlyValName = new QueryEntity()
+            .setTableName("qryOnlyValName")
+            .setValueFieldName("name")
+            .addQueryField("name", String.class.getName(), null)
+            .addQueryField("val", Integer.class.getName(), null);
+
+        testQueryEntityReplicationError(qryOnlyValName);
+    }
+
+    /** */
+    private void testQueryEntityReplicationError(QueryEntity qryEntity) throws 
IgniteInterruptedCheckedException {
+        CacheConfiguration<Integer, TestVal> ccfg = new 
CacheConfiguration<Integer, TestVal>(qryEntity.getTableName())
+            .setQueryEntities(Collections.singletonList(qryEntity));
+
+        src.getOrCreateCache(ccfg);
+
+        IgniteInternalFuture<?> fut = startIgniteToPostgreSqlCdcConsumer(
+            src.configuration(),
+            new HashSet<>(Collections.singletonList(qryEntity.getTableName())),
+            postgres.getPostgresDatabase()
+        );
+
+        waitForCondition(fut::isDone, getTestTimeout());
+
+        checkFutureEndWithError(fut);
+    }
+
+    /** */
+    protected void checkFutureEndWithError(IgniteInternalFuture<?> fut) {
+        assertTrue(fut.error() != null);
+    }
+
+    /** */
+    @Test
+    public void testQueryEntityWithKeyValueFieldNames() throws 
IgniteCheckedException {
+        QueryEntity qryEntity = new QueryEntity()
+            .setTableName("qryKeyValName")
+            .setKeyFieldName("id")
+            .setValueFieldName("name")
+            .addQueryField("id", Integer.class.getName(), null)
+            .addQueryField("name", String.class.getName(), null)
+            .addQueryField("val", Integer.class.getName(), null);
+
+        int id = 2;
+
+        String name = "1";
+        int val = 3;
+
+        Function<ResultSet, Boolean> check = res -> {
+            try {
+                assertTrue(res.next());
+
+                int actId = res.getInt("id");
+                String actName = res.getString("name");
+                int actVal = res.getInt("val");
+
+                return actId == id && Objects.equals(actName, name) && actVal 
== val;
+            }
+            catch (Exception e) {
+                return false;
+            }
+        };
+
+        String[] tableFields = new String[] {
+            "ID INT NOT NULL",
+            "NAME VARCHAR",
+            "VAL INT"
+        };
+
+        String constraint = "PRIMARY KEY (ID)";
+
+        testQueryEntityReplicationSuccess(qryEntity, tableFields, constraint, 
id, new TestVal(name, val), check);
+    }
+
+    /** */
+    @Test
+    public void testQueryEntityWithKeyFieldsAndValName() throws 
IgniteCheckedException {
+        QueryEntity qryEntity = new QueryEntity()
+            .setTableName("qryKeyFieldsValName")
+            .setKeyFields(new HashSet<>(Arrays.asList("id", "subId")))
+            .setValueFieldName("name")
+            .addQueryField("id", Integer.class.getName(), null)
+            .addQueryField("subId", String.class.getName(), null)
+            .addQueryField("name", String.class.getName(), null)
+            .addQueryField("val", Integer.class.getName(), null);
+
+        int id = 4;
+        String subId = "foobar";
+
+        String name = "5";
+        int val = 0;
+
+        Function<ResultSet, Boolean> check = res -> {
+            try {
+                assertTrue(res.next());
+
+                int actId = res.getInt("id");
+                String actSubId = res.getString("subId");
+                String actName = res.getString("name");
+                int actVal = res.getInt("val");
+
+                return actId == id && Objects.equals(actSubId, subId) && 
Objects.equals(actName, name) && actVal == val;
+            }
+            catch (Exception e) {
+                return false;
+            }
+        };
+
+        String[] tableFields = new String[] {
+            "ID INT NOT NULL",
+            "SUBID VARCHAR(15) NOT NULL",
+            "NAME VARCHAR",
+            "VAL INT"
+        };
+
+        String constraint = "PRIMARY KEY (ID, SUBID)";
+
+        testQueryEntityReplicationSuccess(
+            qryEntity,
+            tableFields,
+            constraint,
+            new TestKey(id, subId),
+            new TestVal(name, val),
+            check
+        );
+    }
+
+    /** */
+    @Test
+    public void testQueryEntityWithKeyNameValueTypeAndFields() throws 
IgniteCheckedException {
+        QueryEntity qryEntity = new QueryEntity()
+            .setTableName("TESTING")
+            .setKeyFieldName("id")
+            .setValueType("org.apache.ignite.cdc.postgres.TestVal")
+            .addQueryField("id", Integer.class.getName(), null)
+            .addQueryField("name", String.class.getName(), null)
+            .addQueryField("val", Integer.class.getName(), null);
+
+        int id = 3;
+
+        String name = "test";
+        int val = 9;
+
+        Function<ResultSet, Boolean> check = res -> {
+            try {
+                assertTrue(res.next());
+
+                int actId = res.getInt("id");
+                String actName = res.getString("name");
+                int actVal = res.getInt("val");
+
+                return actId == id && Objects.equals(actName, name) && actVal 
== val;
+            }
+            catch (Exception e) {
+                return false;
+            }
+        };
+
+        String[] tableFields = new String[] {
+            "ID INT NOT NULL",
+            "NAME VARCHAR",
+            "VAL INT"
+        };
+
+        String constraint = "PRIMARY KEY (ID)";
+
+        testQueryEntityReplicationSuccess(qryEntity, tableFields, constraint, 
id, new TestVal(name, val), check);
+    }
+
+    /** */
+    public <K, V> void testQueryEntityReplicationSuccess(
+        QueryEntity qryEntity,
+        String[] fields,
+        String constraint,
+        K key,
+        V val,
+        Function<ResultSet, Boolean> checkTable
+    ) throws IgniteCheckedException {
+        CacheConfiguration<K, V> ccfg = new CacheConfiguration<K, 
V>(qryEntity.getTableName())
+            .setQueryEntities(Collections.singletonList(qryEntity));
+
+        IgniteCache<K, V> cache = src.getOrCreateCache(ccfg);
+
+        if (!createTables) {
+            StringBuilder fieldsBldr = new StringBuilder();
+
+            A.notEmpty(fields, "Empty fields declaration.");
+
+            for (int i = 0; i < fields.length; ++i) {
+                fieldsBldr.append(fields[i]);
+
+                if (i < fields.length - 1)
+                    fieldsBldr.append(",");
+            }
+
+            String constraintQry = constraint == null ? "" : ", " + constraint;
+
+            executeOnPostgreSql(postgres, "CREATE TABLE IF NOT EXISTS " + 
qryEntity.getTableName() +
+                " (" + fieldsBldr + ", version BYTEA NOT NULL" + constraintQry 
+ ")");
+        }
+
+        cache.put(key, val);
+
+        IgniteInternalFuture<?> fut = startCdc(new 
HashSet<>(Collections.singletonList(qryEntity.getTableName())));
+
+        assertTrue(waitForCondition(waitForTableSize(postgres, 
qryEntity.getTableName(), 1), getTestTimeout()));
+
+        ResultSet set = selectOnPostgreSql(postgres, "SELECT * FROM " + 
qryEntity.getTableName());
+
+        assertTrue(checkTable.apply(set));
+
+        fut.cancel();
+    }
+
+    /** */
+    private static class TestKey {
+        /** */
+        private final int id;
+
+        /** */
+        private final String subId;
+
+        /** */
+        public TestKey(int id, String subId) {
+            this.id = id;
+            this.subId = subId;
+        }
+    }
+
+    /** */
+    private static class TestVal {
+        /** */
+        private final String name;
+
+        /** */
+        private final int val;
+
+        /** */
+        public TestVal(String name, int val) {
+            this.name = name;
+            this.val = val;
+        }
+    }
+}
diff --git 
a/modules/cdc-ext/src/test/resources/replication/ignite-to-postgres.xml 
b/modules/cdc-ext/src/test/resources/replication/ignite-to-postgres.xml
new file mode 100644
index 00000000..727194d0
--- /dev/null
+++ b/modules/cdc-ext/src/test/resources/replication/ignite-to-postgres.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans";
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+       xmlns:util="http://www.springframework.org/schema/util";
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util 
http://www.springframework.org/schema/util/spring-util.xsd";>
+    <bean id="grid.cfg" 
class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="igniteInstanceName" value="{INSTANCE_NAME}" />
+        <property name="peerClassLoadingEnabled" value="true" />
+        <property name="localHost" value="127.0.0.1" />
+        <property name="consistentId" value="{CONSISTENT_ID}" />
+
+        <property name="dataStorageConfiguration">
+            <bean 
class="org.apache.ignite.configuration.DataStorageConfiguration">
+                <property name="defaultDataRegionConfiguration">
+                    <bean 
class="org.apache.ignite.configuration.DataRegionConfiguration">
+                        <property name="cdcEnabled" value="true" />
+                        <property name="persistenceEnabled" value="true" />
+                    </bean>
+                </property>
+            </bean>
+        </property>
+    </bean>
+
+    <bean id="cdc.cfg" class="org.apache.ignite.cdc.CdcConfiguration">
+        <property name="consumer">
+            <bean 
class="org.apache.ignite.cdc.postgresql.IgniteToPostgreSqlCdcConsumer">
+                <property name="caches">
+                    <list>
+                        <value>T1</value>
+                        <value>T2</value>
+                        <value>T3</value>
+                        <value>T4</value>
+                        <value>T5</value>
+                        <value>T6</value>
+                        <value>qryOnlyValType</value>
+                        <value>qryOnlyValName</value>
+                        <value>qryKeyValName</value>
+                        <value>qryKeyFieldsValName</value>
+                        <value>TESTING</value>
+                    </list>
+                </property>
+                <property name="batchSize" value="{BATCH_SIZE}" />
+                <property name="onlyPrimary" value="{ONLY_PRIMARY}" />
+                <property name="createTables" value="{CREATE_TABLE}" />
+                <property name="dataSource" ref="dataSource" />
+            </bean>
+        </property>
+    </bean>
+
+    <bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource" 
destroy-method="close">
+        <property name="driverClassName" value="org.postgresql.Driver"/>
+        <property name="url" value="{DB_URL}"/>
+        <property name="username" value="{DB_USER}"/>
+        <property name="password" value="{DB_PASSWORD}"/>
+
+        <!-- Optional pool configuration -->
+        <property name="initialSize" value="3"/>
+        <property name="maxTotal" value="10"/>
+        <property name="maxIdle" value="3"/>
+        <property name="minIdle" value="1"/>
+        <property name="validationQuery" value="SELECT 1"/>
+        <property name="testOnBorrow" value="true"/>
+    </bean>
+</beans>
diff --git a/parent-internal/pom.xml b/parent-internal/pom.xml
index 9d55fd78..e6f5e701 100644
--- a/parent-internal/pom.xml
+++ b/parent-internal/pom.xml
@@ -40,6 +40,8 @@
         <spring61.version>6.1.15</spring61.version>
         <activemq.version>5.12.0</activemq.version>
         <aspectj.version>1.8.13</aspectj.version>
+        <embedded.postgres.version>2.0.3</embedded.postgres.version>
+        <commons.dbcp2.version>2.13.0</commons.dbcp2.version>
 
         <!--
             NOTE: The dependency versions below must be changed in the release 
branch up to

Reply via email to