This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new a6027e64 IGNITE-25774 Add Postgres CDC consumer (#311)
a6027e64 is described below
commit a6027e64669b0af63a126f794fc1b1ab8bfff7d6
Author: Maksim Davydov <[email protected]>
AuthorDate: Fri Aug 1 15:37:44 2025 +0300
IGNITE-25774 Add Postgres CDC consumer (#311)
---
modules/cdc-ext/pom.xml | 26 +
.../postgresql/IgniteToPostgreSqlCdcApplier.java | 672 ++++++++++++++++++++
.../postgresql/IgniteToPostgreSqlCdcConsumer.java | 246 ++++++++
.../cdc/kafka/CdcKafkaReplicationAppsTest.java | 2 +-
.../CdcPostgreSqlReplicationAbstractTest.java | 178 ++++++
.../postgres/CdcPostgreSqlReplicationAppsTest.java | 45 ++
.../cdc/postgres/CdcPostgreSqlReplicationTest.java | 699 +++++++++++++++++++++
.../resources/replication/ignite-to-postgres.xml | 84 +++
parent-internal/pom.xml | 2 +
9 files changed, 1953 insertions(+), 1 deletion(-)
diff --git a/modules/cdc-ext/pom.xml b/modules/cdc-ext/pom.xml
index c8052fe2..014db80c 100644
--- a/modules/cdc-ext/pom.xml
+++ b/modules/cdc-ext/pom.xml
@@ -164,8 +164,34 @@
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>io.zonky.test</groupId>
+ <artifactId>embedded-postgres</artifactId>
+ <version>${embedded.postgres.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-dbcp2</artifactId>
+ <version>${commons.dbcp2.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>io.zonky.test.postgres</groupId>
+ <artifactId>embedded-postgres-binaries-bom</artifactId>
+ <version>14.5.0</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
<build>
<plugins>
<plugin>
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java
new file mode 100644
index 00000000..4399880f
--- /dev/null
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcApplier.java
@@ -0,0 +1,672 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.postgresql;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import javax.sql.DataSource;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheEntryVersion;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cdc.CdcCacheEvent;
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.internal.util.typedef.F;
+
+import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.UNDEFINED_CACHE_ID;
+
+/** */
+public class IgniteToPostgreSqlCdcApplier {
+ /** */
+ public static final String DFLT_SQL_TYPE = "OTHER";
+
+ /** */
+ public static final Map<String, String> JAVA_TO_SQL_TYPES;
+
+ /** */
+ public static final Set<String> SQL_TYPES_WITH_PRECISION_ONLY;
+
+ /** */
+ public static final Set<String> SQL_TYPES_WITH_PRECISION_AND_SCALE;
+
+ static {
+ Map<String, String> javaToSqlTypes = new HashMap<>();
+
+ javaToSqlTypes.put("java.lang.String", "VARCHAR");
+ javaToSqlTypes.put("java.lang.Integer", "INT");
+ javaToSqlTypes.put("int", "INT");
+ javaToSqlTypes.put("java.lang.Long", "BIGINT");
+ javaToSqlTypes.put("long", "BIGINT");
+ javaToSqlTypes.put("java.lang.Boolean", "BOOLEAN");
+ javaToSqlTypes.put("boolean", "BOOLEAN");
+ javaToSqlTypes.put("java.lang.Double", "DOUBLE PRECISION");
+ javaToSqlTypes.put("double", "DOUBLE PRECISION");
+ javaToSqlTypes.put("java.lang.Float", "REAL");
+ javaToSqlTypes.put("float", "REAL");
+ javaToSqlTypes.put("java.math.BigDecimal", "DECIMAL");
+ javaToSqlTypes.put("java.lang.Short", "SMALLINT");
+ javaToSqlTypes.put("short", "SMALLINT");
+ javaToSqlTypes.put("java.lang.Byte", "SMALLINT");
+ javaToSqlTypes.put("byte", "SMALLINT");
+ javaToSqlTypes.put("java.util.UUID", "UUID");
+ javaToSqlTypes.put("[B", "BYTEA");
+ javaToSqlTypes.put("java.lang.Object", "OTHER");
+
+ JAVA_TO_SQL_TYPES = Collections.unmodifiableMap(javaToSqlTypes);
+
+ Set<String> sqlTypesWithPrecisionOnly = new HashSet<>();
+
+ sqlTypesWithPrecisionOnly.add("VARCHAR");
+ sqlTypesWithPrecisionOnly.add("DOUBLE PRECISION");
+
+ SQL_TYPES_WITH_PRECISION_ONLY =
Collections.unmodifiableSet(sqlTypesWithPrecisionOnly);
+
+ Set<String> sqlTypesWithPrecisionAndScale = new HashSet<>();
+
+ sqlTypesWithPrecisionAndScale.add("DECIMAL");
+ sqlTypesWithPrecisionAndScale.add("REAL");
+
+ SQL_TYPES_WITH_PRECISION_AND_SCALE =
Collections.unmodifiableSet(sqlTypesWithPrecisionAndScale);
+ }
+
+ /** */
+ private final DataSource dataSrc;
+
+ /** */
+ private final long batchSize;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private final Map<Integer, String> cacheIdToUpsertQry = new HashMap<>();
+
+ /** */
+ private final Map<Integer, String> cacheIdToDeleteQry = new HashMap<>();
+
+ /** */
+ private final Map<Integer, Set<String>> cacheIdToPrimaryKeys = new
HashMap<>();
+
+ /** */
+ private final Map<Integer, Set<String>> cacheIdToFields = new HashMap<>();
+
+ /** */
+ private final Set<Object> curKeys = new HashSet<>();
+
+ /**
+ * @param dataSrc {@link DataSource} - connection pool to PostgreSql
+ * @param batchSize the number of CDC events to include in a single batch
+ * @param log the {@link IgniteLogger} instance used for logging CDC
processing events
+ */
+ public IgniteToPostgreSqlCdcApplier(
+ DataSource dataSrc,
+ long batchSize,
+ IgniteLogger log
+ ) {
+ this.dataSrc = dataSrc;
+ this.batchSize = batchSize;
+ this.log = log;
+ }
+
+ /**
+ * @param evts an {@link Iterator} of {@link CdcEvent} objects to be
applied
+ * @return the total number of events successfully batched and executed
+ */
+ public long applyEvents(Iterator<CdcEvent> evts) {
+ try (Connection conn = dataSrc.getConnection()) {
+ try {
+ // Setting it to true doesn't make each SQL query commit
individually - it still commits the entire batch.
+ // We chose to handle commits manually for better control.
+ conn.setAutoCommit(false);
+
+ return applyEvents(conn, evts);
+ }
+ catch (SQLException e) {
+ conn.rollback();
+
+ throw e;
+ }
+ }
+ catch (Throwable e) {
+ log.error("Error during CDC event application: " + e.getMessage(),
e);
+
+ throw new IgniteException("Failed to apply CDC events", e);
+ }
+ }
+
+ /**
+ * @param conn connection to PostgreSql
+ * @param evts an {@link Iterator} of {@link CdcEvent} objects to be
applied
+ * @return the total number of events successfully batched and executed
+ */
+ private long applyEvents(Connection conn, Iterator<CdcEvent> evts) {
+ long evtsApplied = 0;
+
+ int currCacheId = UNDEFINED_CACHE_ID;
+ boolean prevOpIsDelete = false;
+
+ PreparedStatement curPrepStmt = null;
+ CdcEvent evt;
+
+ while (evts.hasNext()) {
+ evt = evts.next();
+
+ if (log.isDebugEnabled())
+ log.debug("Event received [evt=" + evt + ']');
+
+ if (currCacheId != evt.cacheId() || prevOpIsDelete ^ (evt.value()
== null)) {
+ if (curPrepStmt != null)
+ evtsApplied += executeBatch(conn, curPrepStmt);
+
+ currCacheId = evt.cacheId();
+ prevOpIsDelete = evt.value() == null;
+
+ curPrepStmt = prepareStatement(conn, evt);
+ }
+
+ if (curKeys.size() >= batchSize || curKeys.contains(evt.key()))
+ evtsApplied += executeBatch(conn, curPrepStmt);
+
+ addEvent(curPrepStmt, evt);
+ }
+
+ if (!curKeys.isEmpty())
+ evtsApplied += executeBatch(conn, curPrepStmt);
+
+ return evtsApplied;
+ }
+
+ /**
+ * @param conn connection to PostgreSql
+ * @param curPrepStmt {@link PreparedStatement}
+ * @return the total number of batches successfully executed. One CdcEvent
- one batch.
+ */
+ private int executeBatch(Connection conn, PreparedStatement curPrepStmt) {
+ try {
+ curKeys.clear();
+
+ if (log.isDebugEnabled())
+ log.debug("Applying batch " + curPrepStmt.toString());
+
+ if (!curPrepStmt.isClosed()) {
+ int batchSize = curPrepStmt.executeBatch().length;
+
+ // It's better to use autoCommit = false and call commit()
manually for improved performance and
+ // clearer transaction boundaries
+ conn.commit();
+
+ return batchSize;
+ }
+
+ throw new IgniteException("Tried to execute on closed prepared
statement!");
+ }
+ catch (SQLException e) {
+ log.error(e.getMessage(), e);
+
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * @param conn connection to PostgreSql
+ * @param evt {@link CdcEvent}
+ * @return relevant {@link PreparedStatement}
+ */
+ private PreparedStatement prepareStatement(Connection conn, CdcEvent evt) {
+ String sqlQry;
+
+ if (evt.value() == null)
+ sqlQry = cacheIdToDeleteQry.get(evt.cacheId());
+ else
+ sqlQry = cacheIdToUpsertQry.get(evt.cacheId());
+
+ if (sqlQry == null)
+ throw new IgniteException("No SQL query is found for cacheId=" +
evt.cacheId());
+
+ if (log.isDebugEnabled())
+ log.debug("Statement updated [cacheId=" + evt.cacheId() + ",
sqlQry=" + sqlQry + ']');
+
+ try {
+ return conn.prepareStatement(sqlQry);
+ }
+ catch (SQLException e) {
+ log.error(e.getMessage(), e);
+
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * @param curPrepStmt current {@link PreparedStatement}
+ * @param evt {@link CdcEvent}
+ */
+ private void addEvent(PreparedStatement curPrepStmt, CdcEvent evt) {
+ try {
+ Iterator<String> itFields = evt.value() == null ?
+ cacheIdToPrimaryKeys.get(evt.cacheId()).iterator() :
+ cacheIdToFields.get(evt.cacheId()).iterator();
+
+ String field;
+
+ BinaryObject keyObj = (evt.key() instanceof BinaryObject) ?
(BinaryObject)evt.key() : null;
+ BinaryObject valObj = (evt.value() instanceof BinaryObject) ?
(BinaryObject)evt.value() : null;
+
+ int idx = 1;
+ Object obj;
+
+ while (itFields.hasNext()) {
+ field = itFields.next();
+
+ if (cacheIdToPrimaryKeys.get(evt.cacheId()).contains(field))
+ obj = keyObj != null ? keyObj.field(field) : evt.key();
+ else
+ obj = valObj != null ? valObj.field(field) : evt.value();
+
+ addObject(curPrepStmt, idx, obj);
+
+ idx++;
+ }
+
+ if (evt.value() != null)
+ curPrepStmt.setBytes(idx, encodeVersion(evt.version()));
+
+ curKeys.add(evt.key());
+
+ curPrepStmt.addBatch();
+ }
+ catch (Throwable e) {
+ log.error(e.getMessage(), e);
+
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * Sets a value in the PreparedStatement at the given index using the
appropriate setter
+ * based on the runtime type of the object.
+ * @param curPrepStmt {@link PreparedStatement}
+ * @param idx value index in {@link PreparedStatement}
+ * @param obj value
+ */
+ private void addObject(PreparedStatement curPrepStmt, int idx, Object obj)
throws SQLException {
+ if (obj == null) {
+ curPrepStmt.setObject(idx, null);
+
+ return;
+ }
+
+ if (obj instanceof String)
+ curPrepStmt.setString(idx, (String)obj);
+ else if (obj instanceof Integer)
+ curPrepStmt.setInt(idx, (Integer)obj);
+ else if (obj instanceof Long)
+ curPrepStmt.setLong(idx, (Long)obj);
+ else if (obj instanceof Short)
+ curPrepStmt.setShort(idx, (Short)obj);
+ else if (obj instanceof Byte)
+ curPrepStmt.setByte(idx, (Byte)obj);
+ else if (obj instanceof Boolean)
+ curPrepStmt.setBoolean(idx, (Boolean)obj);
+ else if (obj instanceof Float)
+ curPrepStmt.setFloat(idx, (Float)obj);
+ else if (obj instanceof Double)
+ curPrepStmt.setDouble(idx, (Double)obj);
+ else if (obj instanceof BigDecimal)
+ curPrepStmt.setBigDecimal(idx, (BigDecimal)obj);
+ else if (obj instanceof UUID)
+ curPrepStmt.setObject(idx, obj, Types.OTHER); // PostgreSQL
expects UUID as OTHER
+ else if (obj instanceof byte[])
+ curPrepStmt.setBytes(idx, (byte[])obj);
+ else
+ curPrepStmt.setObject(idx, obj);
+ }
+
+ /**
+ * @param evts an {@link Iterator} of {@link CdcCacheEvent} objects to
apply
+ * @param createTables tables creation flag. If true - attempt to create
tables will be made.
+ * @return Number of applied events.
+ */
+ public long applyCacheEvents(Iterator<CdcCacheEvent> evts, boolean
createTables) {
+ CdcCacheEvent evt;
+ QueryEntity entity;
+
+ long cnt = 0;
+
+ while (evts.hasNext()) {
+ evt = evts.next();
+
+ if (evt.queryEntities().size() != 1)
+ throw new IgniteException("There should be exactly 1
QueryEntity for cacheId: " + evt.cacheId());
+
+ entity = evt.queryEntities().iterator().next();
+
+ if (createTables)
+ createTableIfNotExists(entity);
+
+ cacheIdToUpsertQry.put(evt.cacheId(), getUpsertSqlQry(entity));
+
+ cacheIdToDeleteQry.put(evt.cacheId(), getDeleteSqlQry(entity));
+
+ cacheIdToPrimaryKeys.put(evt.cacheId(), getPrimaryKeys(entity));
+
+ cacheIdToFields.put(evt.cacheId(), entity.getFields().keySet());
+
+ if (createTables && log.isInfoEnabled())
+ log.info("Cache table created [tableName=" +
entity.getTableName() +
+ ", columns=" + entity.getFields().keySet() + ']');
+
+ cnt++;
+ }
+
+ return cnt;
+ }
+
+ /**
+ * @param entity the {@link QueryEntity} describing the table schema to
create
+ */
+ private void createTableIfNotExists(QueryEntity entity) {
+ String createSqlStmt = getCreateTableSqlStatement(entity);
+
+ try (Connection conn = dataSrc.getConnection(); Statement stmt =
conn.createStatement()) {
+ stmt.execute(createSqlStmt);
+ }
+ catch (SQLException e) {
+ log.error(e.getMessage(), e);
+
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * Generates the SQL statement for creating a table.
+ *
+ * @param entity QueryEntity instance describing the cache structure.
+ * @return SQL statement for creating a table.
+ */
+ private String getCreateTableSqlStatement(QueryEntity entity) {
+ StringBuilder ddl = new StringBuilder("CREATE TABLE IF NOT EXISTS
").append(entity.getTableName()).append(" (");
+
+ addFieldsAndTypes(entity, ddl);
+
+ ddl.append(", version BYTEA NOT NULL");
+
+ ddl.append(", PRIMARY KEY (");
+
+ addPrimaryKeys(entity, ddl);
+
+ ddl.append(')').append(')');
+
+ return ddl.toString();
+ }
+
+ /**
+ * Constructs DDL-compatible SQL fragment listing fields along with their
mapped SQL types.
+ *
+ * @param entity QueryEntity instance describing the cache structure.
+ * @param sql Target StringBuilder where the result will be appended.
+ */
+ private void addFieldsAndTypes(QueryEntity entity, StringBuilder sql) {
+ Iterator<Map.Entry<String, String>> iter =
entity.getFields().entrySet().iterator();
+
+ Map.Entry<String, String> field;
+ String type;
+
+ Integer precision;
+ Integer scale;
+
+ while (iter.hasNext()) {
+ field = iter.next();
+ type = JAVA_TO_SQL_TYPES.getOrDefault(field.getValue(),
DFLT_SQL_TYPE);
+
+ sql.append(field.getKey()).append(" ").append(type);
+
+ precision = entity.getFieldsPrecision().get(field.getKey());
+ scale = entity.getFieldsScale().get(field.getKey());
+
+ if (precision != null && precision > 0) {
+ if (SQL_TYPES_WITH_PRECISION_ONLY.contains(type))
+ sql.append("(").append(precision).append(")");
+ else if (SQL_TYPES_WITH_PRECISION_AND_SCALE.contains(type)) {
+ sql.append("(").append(precision);
+
+ if (scale != null && scale >= 0)
+ sql.append(", ").append(scale);
+
+ sql.append(")");
+ }
+ }
+
+ if (iter.hasNext())
+ sql.append(", ");
+ }
+ }
+
+ /**
+ * Generates a parameterized SQL UPSERT (INSERT ... ON CONFLICT DO UPDATE)
query
+ * for the given {@link QueryEntity}, including a version-based conflict
resolution condition.
+ * <pre>{@code
+ * INSERT INTO my_table (id, name, version) VALUES (?, ?, ?)
+ * ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name
+ * WHERE version < EXCLUDED.version
+ * }</pre>
+ *
+ * Notes:
+ * <ul>
+ * <li>The {@code version} field is added to support version-based
upsert logic.</li>
+ * <li>Primary key fields are excluded from the {@code DO UPDATE SET}
clause.</li>
+ * <li>All fields are assigned {@code ?} placeholders for use with
{@link PreparedStatement}.</li>
+ * </ul>
+ *
+ * @param entity the {@link QueryEntity} describing the table, fields, and
primary keys
+ * @return a SQL UPSERT query string with parameter placeholders and
version conflict resolution
+ */
+ private String getUpsertSqlQry(QueryEntity entity) {
+ StringBuilder sql = new StringBuilder("INSERT INTO
").append(entity.getTableName()).append(" (");
+
+ addFields(entity, sql);
+
+ sql.append(", version) VALUES (");
+
+ for (int i = 0; i < entity.getFields().size() + 1; ++i) { // version
field included
+ sql.append('?');
+
+ if (i < entity.getFields().size())
+ sql.append(", ");
+ }
+
+ sql.append(") ON CONFLICT (");
+
+ addPrimaryKeys(entity, sql);
+
+ sql.append(") DO UPDATE SET ");
+
+ addUpdateFields(entity, sql);
+
+ sql.append(" WHERE ").append(entity.getTableName()).append(".version <
EXCLUDED.version");
+
+ return sql.toString();
+ }
+
+ /**
+ * Builds a comma-separated list of field names extracted from the
QueryEntity.
+ *
+ * @param entity QueryEntity instance describing the cache structure.
+ * @param sql Target StringBuilder where the result will be appended.
+ */
+ private void addFields(QueryEntity entity, StringBuilder sql) {
+ Iterator<Map.Entry<String, String>> iter =
entity.getFields().entrySet().iterator();
+ Map.Entry<String, String> field;
+
+ while (iter.hasNext()) {
+ field = iter.next();
+
+ sql.append(field.getKey());
+
+ if (iter.hasNext())
+ sql.append(", ");
+ }
+ }
+
+ /**
+ * Builds a SQL update clause excluding primary key fields, including
version-specific fields.
+ *
+ * @param entity QueryEntity instance describing the cache structure.
+ * @param sql Target StringBuilder where the resulting SQL fragment will
be appended.
+ */
+ private void addUpdateFields(QueryEntity entity, StringBuilder sql) {
+ Set<String> primaryFields = getPrimaryKeys(entity);
+
+ Iterator<String> itAllFields = F.concat(false, "version",
entity.getFields().keySet()).iterator();
+
+ String field;
+
+ boolean first = true;
+
+ while (itAllFields.hasNext()) {
+ field = itAllFields.next();
+
+ if (primaryFields.contains(field))
+ continue;
+
+ if (!first)
+ sql.append(", ");
+
+ sql.append(field).append(" = EXCLUDED.").append(field);
+
+ if (first)
+ first = false;
+ }
+ }
+
+ /**
+ * Generates a parameterized SQL DELETE query for the given {@link
QueryEntity}.
+ * Example:
+ * <pre>{@code
+ * // For a key: id
+ * DELETE FROM my_table WHERE (id = ?)
+ * }</pre>
+ *
+ * If the table has a composite primary key, all keys will be included
with AND conditions:
+ * <pre>{@code
+ * // For a composite key: id1, id2
+ * DELETE FROM my_table WHERE (id1 = ? AND id2 = ?)
+ * }</pre>
+ *
+ * @param entity the {@link QueryEntity} describing the table and its
primary keys
+ * @return a SQL DELETE query string with parameter placeholders for
primary key values
+ */
+ private String getDeleteSqlQry(QueryEntity entity) {
+ StringBuilder deleteQry = new StringBuilder("DELETE FROM
").append(entity.getTableName()).append(" WHERE (");
+
+ Iterator<String> itKeys = getPrimaryKeys(entity).iterator();
+ String key;
+
+ while (itKeys.hasNext()) {
+ key = itKeys.next();
+
+ deleteQry.append(key).append(" = ?");
+
+ if (itKeys.hasNext())
+ deleteQry.append(" AND ");
+ }
+
+ deleteQry.append(')');
+
+ return deleteQry.toString();
+ }
+
+ /**
+ * Generates a SQL fragment listing primary key fields for the given
QueryEntity.
+ *
+ * @param entity QueryEntity instance describing the cache structure.
+ * @param sql Target StringBuilder where the resulting SQL fragment will
be appended.
+ */
+ private void addPrimaryKeys(QueryEntity entity, StringBuilder sql) {
+ Iterator<String> iterKeys = getPrimaryKeys(entity).iterator();
+
+ while (iterKeys.hasNext()) {
+ sql.append(iterKeys.next());
+
+ if (iterKeys.hasNext())
+ sql.append(", ");
+ }
+ }
+
+ /**
+ * Retrieves the primary key field names from the provided {@link
QueryEntity}.
+ * If no primary keys are defined, it returns a set containing the first
field from the table.
+ *
+ * @param entity The {@link QueryEntity} representing the table and its
metadata.
+ * @return A set of primary key field names or a set containing the first
field if no primary keys are defined.
+ */
+ private Set<String> getPrimaryKeys(QueryEntity entity) {
+ Set<String> keys = entity.getKeyFields();
+
+ if (keys == null || keys.isEmpty()) {
+ if (entity.getKeyFieldName() == null)
+ throw new IgniteException("Couldn't determine key field for
queryEntity [tableName=" +
+ entity.getTableName() + ']');
+
+ return Collections.singleton(entity.getKeyFieldName());
+ }
+
+ return keys;
+ }
+
+ /**
+ * Encodes the components of a {@link CacheEntryVersion} into a 16-byte
array
+ * using big-endian byte order for compact and lexicographically
comparable storage.
+ * <p>
+ * The encoding format is:
+ * <ul>
+ * <li>4 bytes — {@code topologyVersion} (int)</li>
+ * <li>8 bytes — {@code order} (long)</li>
+ * <li>4 bytes — {@code nodeOrder} (int)</li>
+ * </ul>
+ * This format ensures that the resulting {@code byte[]} can be compared
+ * lexicographically to determine version ordering (i.e., a larger byte
array
+ * represents a newer version), which is compatible with PostgreSQL {@code
BYTEA}
+ * comparison semantics.
+ *
+ * @param ver the {@link CacheEntryVersion} instance to encode
+ * @return a 16-byte array representing the version in big-endian format
+ */
+ private byte[] encodeVersion(CacheEntryVersion ver) {
+ ByteBuffer buf = ByteBuffer.allocate(16).order(ByteOrder.BIG_ENDIAN);
+
+ buf.putInt(ver.topologyVersion());
+ buf.putLong(ver.order());
+ buf.putInt(ver.nodeOrder());
+
+ return buf.array();
+ }
+}
diff --git
a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcConsumer.java
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcConsumer.java
new file mode 100644
index 00000000..c24fee44
--- /dev/null
+++
b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/postgresql/IgniteToPostgreSqlCdcConsumer.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.postgresql;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.sql.DataSource;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cdc.CdcCacheEvent;
+import org.apache.ignite.cdc.CdcConsumer;
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.cdc.TypeMapping;
+import org.apache.ignite.internal.processors.metric.MetricRegistryImpl;
+import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.metric.MetricRegistry;
+import org.apache.ignite.resources.LoggerResource;
+
+/**
+ * This class represents a consumer component that replicates cache changes
from Apache Ignite to PostgreSQL using
+ * Change Data Capture (CDC) mechanism. It applies events to PostgreSQL via
batch-prepared SQL statements, ensuring
+ * efficient handling of large volumes of updates.
+ *
+ * <p>Additionally, it provides methods for initializing connections, managing
transactions, and performing atomic batches
+ * of writes.</p>
+ */
+public class IgniteToPostgreSqlCdcConsumer implements CdcConsumer {
+ /** */
+ public static final String EVTS_SENT_CNT = "EventsCount";
+
+ /** */
+ public static final String EVTS_SENT_CNT_DESC = "Count of events applied
to PostgreSQL";
+
+ /** */
+ public static final String LAST_EVT_SENT_TIME = "LastEventTime";
+
+ /** */
+ public static final String LAST_EVT_SENT_TIME_DESC = "Timestamp of last
applied event to PostgreSQL";
+
+ /** */
+ private static final boolean DFLT_IS_ONLY_PRIMARY = true;
+
+ /** */
+ private static final long DFLT_BATCH_SIZE = 1024;
+
+ /** */
+ private static final boolean DFLT_CREATE_TABLES = false;
+
+ /** */
+ private DataSource dataSrc;
+
+ /** Collection of cache names which will be replicated to PostgreSQL. */
+ private Collection<String> caches;
+
+ /** */
+ private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY;
+
+ /** */
+ private long batchSize = DFLT_BATCH_SIZE;
+
+ /** */
+ private boolean createTables = DFLT_CREATE_TABLES;
+
+ /** Log. */
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** Cache IDs. */
+ private Set<Integer> cachesIds;
+
+ /** Applier instance responsible for applying individual CDC events to
PostgreSQL. */
+ private IgniteToPostgreSqlCdcApplier applier;
+
+ /** Count of events applied to PostgreSQL. */
+ private AtomicLongMetric evtsCnt;
+
+ /** Timestamp of last applied batch to PostgreSQL. */
+ private AtomicLongMetric lastEvtTs;
+
+ /** {@inheritDoc} */
+ @Override public void start(MetricRegistry reg) {
+ A.notNull(dataSrc, "dataSource");
+ A.notEmpty(caches, "caches");
+ A.ensure(batchSize > 0, "batchSize");
+
+ cachesIds = caches.stream()
+ .map(CU::cacheId)
+ .collect(Collectors.toSet());
+
+ applier = new IgniteToPostgreSqlCdcApplier(dataSrc, batchSize, log);
+
+ MetricRegistryImpl mreg = (MetricRegistryImpl)reg;
+
+ this.evtsCnt = mreg.longMetric(EVTS_SENT_CNT, EVTS_SENT_CNT_DESC);
+ this.lastEvtTs = mreg.longMetric(LAST_EVT_SENT_TIME,
LAST_EVT_SENT_TIME_DESC);
+
+ if (log.isInfoEnabled())
+ log.info("CDC Ignite to PostgreSQL start-up [cacheIds=" +
cachesIds + ']');
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onEvents(Iterator<CdcEvent> events) {
+ Iterator<CdcEvent> filtered = F.iterator(
+ events,
+ F.identity(),
+ true,
+ evt -> !onlyPrimary || evt.primary(),
+ evt -> cachesIds.contains(evt.cacheId()));
+
+ long evtsSent = applier.applyEvents(filtered);
+
+ if (evtsSent > 0) {
+ evtsCnt.add(evtsSent);
+ lastEvtTs.value(System.currentTimeMillis());
+
+ if (log.isInfoEnabled())
+ log.info("Events applied [evtsApplied=" + evtsCnt.value() +
']');
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTypes(Iterator<BinaryType> types) {
+ types.forEachRemaining(e -> {
+ // Just skip. Handle of cache events not supported.
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMappings(Iterator<TypeMapping> mappings) {
+ mappings.forEachRemaining(e -> {
+ // Just skip. Handle of cache events not supported.
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents) {
+ Iterator<CdcCacheEvent> filtered = F.iterator(
+ cacheEvents,
+ F.identity(),
+ true,
+ evt -> cachesIds.contains(evt.cacheId()));
+
+ long tablesCreated = applier.applyCacheEvents(filtered, createTables);
+
+ if (createTables && tablesCreated > 0 && log.isInfoEnabled())
+ log.info("Cache changes applied [tablesCreatedCnt=" +
tablesCreated + ']');
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCacheDestroy(Iterator<Integer> caches) {
+ caches.forEachRemaining(e -> {
+ // Just skip. Handle of cache events not supported.
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() {
+
+ }
+
+ /**
+ * Sets the datasource configuration for connecting to the PostgreSQL
database.
+ *
+ * @param dataSrc Configured data source.
+ * @return {@code this} for chaining.
+ */
+ public IgniteToPostgreSqlCdcConsumer setDataSource(DataSource dataSrc) {
+ this.dataSrc = dataSrc;
+
+ return this;
+ }
+
+ /**
+ * Sets cache names to replicate.
+ *
+ * @param caches Cache names.
+ * @return {@code this} for chaining.
+ */
+ public IgniteToPostgreSqlCdcConsumer setCaches(Set<String> caches) {
+ this.caches = Collections.unmodifiableSet(caches);
+
+ return this;
+ }
+
+ /**
+ * Enables/disables filtering to accept only primary-node originated
events.
+ *
+ * @param onlyPrimary True to restrict replication to primary events only.
+ * @return {@code this} for chaining.
+ */
+ public IgniteToPostgreSqlCdcConsumer setOnlyPrimary(boolean onlyPrimary) {
+ this.onlyPrimary = onlyPrimary;
+
+ return this;
+ }
+
+ /**
+ * Sets the batch size that will be submitted to PostgreSQL.
+ * <p>
+ * This setting controls how many statements are sent in a single {@link
java.sql.PreparedStatement#executeBatch()} call.
+ * <p>
+ *
+ * @param batchSize number of statements per batch.
+ * @return {@code this} for chaining.
+ */
+ public IgniteToPostgreSqlCdcConsumer setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+
+ return this;
+ }
+
+ /**
+ * Enables/disables creation of tables by this instance.
+ *
+ * @param createTables True to create tables on start-up
+ * @return {@code this} for chaining.
+ */
+ public IgniteToPostgreSqlCdcConsumer setCreateTables(boolean createTables)
{
+ this.createTables = createTables;
+
+ return this;
+ }
+}
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
index 927a7b25..ef4e1400 100644
---
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
@@ -191,7 +191,7 @@ public class CdcKafkaReplicationAppsTest extends
CdcKafkaReplicationTest {
}
/** */
- private String prepareConfig(String path, Map<String, String> params) {
+ public static String prepareConfig(String path, Map<String, String>
params) {
try {
String cfg = new
String(Files.readAllBytes(Paths.get(CdcKafkaReplicationAppsTest.class.getResource(path).toURI())));
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAbstractTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAbstractTest.java
new file mode 100644
index 00000000..57da44b5
--- /dev/null
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAbstractTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.postgres;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Set;
+import javax.sql.DataSource;
+import io.zonky.test.db.postgres.embedded.EmbeddedPostgres;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cdc.CdcConfiguration;
+import org.apache.ignite.cdc.postgresql.IgniteToPostgreSqlCdcConsumer;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** */
+public class CdcPostgreSqlReplicationAbstractTest extends
GridCommonAbstractTest {
+ /** */
+ protected static final int BATCH_SIZE = 128;
+
+ /** */
+ protected static final int KEYS_CNT = 1024;
+
+ /** */
+ protected void executeOnIgnite(IgniteEx src, String sqlText, Object...
args) {
+ SqlFieldsQuery qry = new SqlFieldsQuery(sqlText).setArgs(args);
+
+ try (FieldsQueryCursor<List<?>> cursor =
src.context().query().querySqlFields(qry, true)) {
+ cursor.getAll();
+ }
+ }
+
+ /** */
+ protected ResultSet selectOnPostgreSql(EmbeddedPostgres postgres, String
qry) {
+ try (Connection conn = postgres.getPostgresDatabase().getConnection())
{
+ PreparedStatement stmt = conn.prepareStatement(qry);
+
+ return stmt.executeQuery();
+ }
+ catch (SQLException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** */
+ protected void executeOnPostgreSql(EmbeddedPostgres postgres, String qry) {
+ try (Connection conn = postgres.getPostgresDatabase().getConnection())
{
+ PreparedStatement stmt = conn.prepareStatement(qry);
+
+ stmt.executeUpdate();
+ }
+ catch (SQLException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** */
+ protected boolean checkRow(
+ EmbeddedPostgres postgres,
+ String tableName,
+ String columnName,
+ String expected,
+ String condition
+ ) {
+ String qry = "SELECT " + columnName + " FROM " + tableName + " WHERE "
+ condition;
+
+ try (ResultSet res = selectOnPostgreSql(postgres, qry)) {
+ if (res.next()) {
+ String actual = res.getString(columnName);
+
+ return expected.equals(actual);
+ }
+
+ return false;
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** */
+ protected GridAbsPredicate waitForTablesCreatedOnPostgres(EmbeddedPostgres
postgres, Set<String> caches) {
+ return () -> {
+ String sql = "SELECT EXISTS (" +
+ " SELECT 1 FROM information_schema.tables " +
+ " WHERE table_name = '%s'" +
+ ")";
+
+ for (String cache : caches) {
+ try (ResultSet rs = selectOnPostgreSql(postgres,
String.format(sql, cache.toLowerCase()))) {
+ rs.next();
+
+ if (!rs.getBoolean(1))
+ return false;
+ }
+ catch (SQLException e) {
+ log.error(e.getMessage(), e);
+
+ throw new IgniteException(e);
+ }
+ }
+
+ return true;
+ };
+ }
+
+ /** */
+ protected GridAbsPredicate waitForTableSize(EmbeddedPostgres postgres,
String tableName, long expSz) {
+ return () -> {
+ try (ResultSet res = selectOnPostgreSql(postgres, "SELECT COUNT(*)
FROM " + tableName)) {
+ res.next();
+
+ long cnt = res.getLong(1);
+
+ return cnt == expSz;
+ }
+ catch (SQLException e) {
+ throw new IgniteException(e);
+ }
+ };
+ }
+
+ /** */
+ protected IgniteToPostgreSqlCdcConsumer getCdcConsumerConfiguration() {
+ return new IgniteToPostgreSqlCdcConsumer()
+ .setBatchSize(BATCH_SIZE)
+ .setOnlyPrimary(true)
+ .setCreateTables(false);
+ }
+
+ /**
+ * @param igniteCfg Ignite configuration.
+ * @param caches Cache name set to stream to PostgreSql.
+ * @param dataSrc Data Source.
+ * @return Future for Change Data Capture application.
+ */
+ protected IgniteInternalFuture<?> startIgniteToPostgreSqlCdcConsumer(
+ IgniteConfiguration igniteCfg,
+ Set<String> caches,
+ DataSource dataSrc
+ ) {
+ IgniteToPostgreSqlCdcConsumer cdcCnsmr = getCdcConsumerConfiguration()
+ .setCaches(caches)
+ .setDataSource(dataSrc);
+
+ CdcConfiguration cdcCfg = new CdcConfiguration();
+
+ cdcCfg.setConsumer(cdcCnsmr);
+
+ return runAsync(new CdcMain(igniteCfg, null, cdcCfg),
"ignite-src-to-postgres-" + igniteCfg.getConsistentId());
+ }
+}
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAppsTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAppsTest.java
new file mode 100644
index 00000000..d7c39555
--- /dev/null
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationAppsTest.java
@@ -0,0 +1,45 @@
+package org.apache.ignite.cdc.postgres;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import javax.sql.DataSource;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.startup.cmdline.CdcCommandLineStartup;
+
+import static
org.apache.ignite.cdc.kafka.CdcKafkaReplicationAppsTest.prepareConfig;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** PostgreSql CDC test with .xml configuration. */
+public class CdcPostgreSqlReplicationAppsTest extends
CdcPostgreSqlReplicationTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteInternalFuture<?>
startIgniteToPostgreSqlCdcConsumer(
+ IgniteConfiguration igniteCfg,
+ Set<String> caches,
+ DataSource dataSrc
+ ) {
+ String cfgPath = "/replication/ignite-to-postgres.xml";
+ String threadName = "ignite-src-to-postgres-xml";
+
+ Map<String, String> params = new HashMap<>();
+
+ params.put("INSTANCE_NAME", igniteCfg.getIgniteInstanceName());
+ params.put("CONSISTENT_ID",
String.valueOf(igniteCfg.getConsistentId()));
+
+ params.put("BATCH_SIZE", Integer.toString(BATCH_SIZE));
+ params.put("ONLY_PRIMARY", "true");
+ params.put("CREATE_TABLE", String.valueOf(createTables));
+
+ params.put("DB_URL", postgres.getJdbcUrl("postgres", "postgres"));
+ params.put("DB_USER", "postgres");
+ params.put("DB_PASSWORD", "");
+
+ return runAsync(() -> CdcCommandLineStartup.main(new String[]
{prepareConfig(cfgPath, params)}), threadName);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void checkFutureEndWithError(IgniteInternalFuture<?>
fut) {
+ // CdcMain error in CdcCommandLineStartup leads to 'System.exit(-1)'
without showing error/cancellation in fut
+ }
+}
diff --git
a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationTest.java
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationTest.java
new file mode 100644
index 00000000..468ff4df
--- /dev/null
+++
b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/postgres/CdcPostgreSqlReplicationTest.java
@@ -0,0 +1,699 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.postgres;
+
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.function.IntConsumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import io.zonky.test.db.postgres.embedded.EmbeddedPostgres;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cdc.postgresql.IgniteToPostgreSqlCdcConsumer;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+@RunWith(Parameterized.class)
+public class CdcPostgreSqlReplicationTest extends
CdcPostgreSqlReplicationAbstractTest {
+ /** */
+ private static final int BACKUP = 0;
+
+ /** */
+ private static final String CACHE_MODE = "PARTITIONED";
+
+ /** */
+ @Parameterized.Parameter()
+ public CacheAtomicityMode atomicity;
+
+ /** */
+ @Parameterized.Parameter(1)
+ public boolean createTables;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "atomicity={0}, createTables={1}")
+ public static Collection<?> parameters() {
+ List<Object[]> params = new ArrayList<>();
+
+ for (CacheAtomicityMode atomicity : EnumSet.of(ATOMIC, TRANSACTIONAL))
{
+ for (boolean createTables : new boolean[] {true, false})
+ params.add(new Object[] {atomicity, createTables});
+ }
+
+ return params;
+ }
+
+ /** */
+ protected IgniteEx src;
+
+ /** */
+ protected EmbeddedPostgres postgres;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ DataRegionConfiguration dataRegionConfiguration = new
DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ .setCdcEnabled(true);
+
+ DataStorageConfiguration dataStorageConfiguration = new
DataStorageConfiguration()
+ .setWalForceArchiveTimeout(5_000)
+ .setDefaultDataRegionConfiguration(dataRegionConfiguration);
+
+ cfg.setDataStorageConfiguration(dataStorageConfiguration);
+ cfg.setConsistentId(igniteInstanceName);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteToPostgreSqlCdcConsumer
getCdcConsumerConfiguration() {
+ IgniteToPostgreSqlCdcConsumer cdcCfg =
super.getCdcConsumerConfiguration();
+
+ cdcCfg.setCreateTables(createTables);
+
+ return cdcCfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ cleanPersistenceDir();
+
+ src = startGrid(0);
+
+ src.cluster().state(ClusterState.ACTIVE);
+
+ postgres = EmbeddedPostgres.builder().start();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ postgres.close();
+ }
+
+ /** */
+ @Test
+ public void testSingleColumnKeyDataReplicationWithPrimaryFirst() throws
Exception {
+ testSingleColumnKeyDataReplication(false);
+ }
+
+ /** */
+ @Test
+ public void testSingleColumnKeyDataReplicationWithPrimaryLast() throws
Exception {
+ testSingleColumnKeyDataReplication(true);
+ }
+
+ /** */
+ public void testSingleColumnKeyDataReplication(boolean isPrimaryLast)
throws Exception {
+ String[] tableFields;
+
+ String insertQry = "INSERT INTO T1 VALUES(?, ?)";
+ String updateQry;
+
+ IntConsumer insert;
+ IntConsumer update;
+
+ if (isPrimaryLast) {
+ tableFields = new String[] {"NAME VARCHAR(20)", "ID BIGINT PRIMARY
KEY"};
+
+ updateQry = "MERGE INTO T1 (NAME, ID) VALUES (?, ?)";
+
+ insert = id -> executeOnIgnite(src, insertQry, "Name" + id, id);
+ update = id -> executeOnIgnite(src, updateQry, id + "Name", id);
+ }
+ else {
+ tableFields = new String[] {"ID BIGINT PRIMARY KEY", "NAME
VARCHAR(20)"};
+
+ updateQry = "MERGE INTO T1 (ID, NAME) VALUES (?, ?)";
+
+ insert = id -> executeOnIgnite(src, insertQry, id, "Name" + id);
+ update = id -> executeOnIgnite(src, updateQry, id, id + "Name");
+ }
+
+ createTable("T1", tableFields, null, null, null);
+
+ Supplier<Boolean> checkInsert = () -> checkSingleColumnKeyTable(id ->
"Name" + id);
+
+ Supplier<Boolean> checkUpdate = () -> checkSingleColumnKeyTable(id ->
id + "Name");
+
+ testDataReplication("T1", insert, checkInsert, update, checkUpdate);
+ }
+
+ /** */
+ private boolean checkSingleColumnKeyTable(Function<Long, String> idToName)
{
+ String qry = "SELECT ID, NAME FROM T1 ORDER BY ID";
+
+ try (ResultSet res = selectOnPostgreSql(postgres, qry)) {
+ long cnt = 0;
+
+ long id;
+ String curName;
+
+ while (res.next()) {
+ id = res.getLong("ID");
+ curName = res.getString("NAME");
+
+ if (!idToName.apply(id).equals(curName) || cnt != id)
+ return false;
+
+ cnt++;
+ }
+
+ return cnt == KEYS_CNT;
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** Replication with complex SQL key. Data inserted via SQL. */
+ @Test
+ public void testMultiColumnKeyDataReplicationWithSql() throws Exception {
+ IntConsumer insert = id -> executeOnIgnite(
+ src,
+ "INSERT INTO T2 (ID, SUBID, NAME, VAL) VALUES(?, ?, ?, ?)",
+ id,
+ "SUBID",
+ "Name" + id,
+ id * 42
+ );
+
+ IntConsumer update = id -> executeOnIgnite(
+ src,
+ "MERGE INTO T2 (ID, SUBID, NAME, VAL) VALUES(?, ?, ?, ?)",
+ id,
+ "SUBID",
+ id + "Name",
+ id + 42
+ );
+
+ testMultiColumnKeyDataReplication("T2", insert, update);
+ }
+
+ /** Replication with complex SQL key. Data inserted via key-value API. */
+ @Test
+ public void testMultiColumnKeyDataReplicationWithKeyValue() throws
Exception {
+ IntConsumer insert = id -> src.cache("T3")
+ .put(
+ new TestKey(id, "SUBID"),
+ new TestVal("Name" + id, id * 42)
+ );
+
+ IntConsumer update = id -> src.cache("T3")
+ .put(
+ new TestKey(id, "SUBID"),
+ new TestVal(id + "Name", id + 42)
+ );
+
+ testMultiColumnKeyDataReplication("T3", insert, update);
+ }
+
+ /** */
+ public void testMultiColumnKeyDataReplication(String tableName,
IntConsumer insert, IntConsumer update) throws Exception {
+ String[] tableFields = new String[] {
+ "ID INT NOT NULL",
+ "SUBID VARCHAR(15) NOT NULL",
+ "NAME VARCHAR",
+ "VAL INT"
+ };
+
+ String constraint = "PRIMARY KEY (ID, SUBID)";
+
+ createTable(tableName, tableFields, constraint,
TestKey.class.getName(), TestVal.class.getName());
+
+ Supplier<Boolean> checkInsert = () ->
checkMultiColumnKeyTable(tableName, id -> "Name" + id, id -> id * 42);
+
+ Supplier<Boolean> checkUpdate = () ->
checkMultiColumnKeyTable(tableName, id -> id + "Name", id -> id + 42);
+
+ testDataReplication(tableName, insert, checkInsert, update,
checkUpdate);
+ }
+
+ /** */
+ private boolean checkMultiColumnKeyTable(
+ String tableName,
+ Function<Integer, String> idToName,
+ Function<Integer, Integer> idToVal
+ ) {
+ String qry = "SELECT ID, NAME, VAL FROM " + tableName + " ORDER BY ID";
+
+ try (ResultSet res = selectOnPostgreSql(postgres, qry)) {
+ long cnt = 0;
+
+ int id;
+ String curName;
+ int curVal;
+
+ while (res.next()) {
+ id = res.getInt("ID");
+ curName = res.getString("NAME");
+ curVal = res.getInt("VAL");
+
+ if (!idToVal.apply(id).equals(curVal) ||
!idToName.apply(id).equals(curName) || cnt != id)
+ return false;
+
+ cnt++;
+ }
+
+ return cnt == KEYS_CNT;
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** */
+ private void testDataReplication(
+ String tableName,
+ IntConsumer insert,
+ Supplier<Boolean> checkInsert,
+ IntConsumer update,
+ Supplier<Boolean> checkUpdate
+ ) throws Exception {
+ IgniteInternalFuture<?> fut =
startCdc(Stream.of(tableName).collect(Collectors.toSet()));
+
+ try {
+ IntStream.range(0, KEYS_CNT).forEach(insert);
+
+ assertTrue(waitForCondition(waitForTableSize(postgres, tableName,
KEYS_CNT), getTestTimeout()));
+
+ assertTrue(checkInsert.get());
+
+ executeOnIgnite(src, "DELETE FROM " + tableName);
+
+ assertTrue(waitForCondition(waitForTableSize(postgres, tableName,
0), getTestTimeout()));
+
+ IntStream.range(0, KEYS_CNT).forEach(insert);
+
+ assertTrue(waitForCondition(waitForTableSize(postgres, tableName,
KEYS_CNT), getTestTimeout()));
+
+ rangeWithDuplicates(0, KEYS_CNT).forEach(update);
+
+ assertTrue(waitForCondition(checkUpdate::get, getTestTimeout()));
+ }
+ finally {
+ fut.cancel();
+ }
+ }
+
+ /**
+ * @param startInclusive Start inclusive.
+ * @param endExclusive End exclusive.
+ */
+ private IntStream rangeWithDuplicates(int startInclusive, int
endExclusive) {
+ List<Integer> duplicatedKeys = IntStream.concat(
+ IntStream.range(startInclusive, endExclusive),
+ IntStream.range(startInclusive, endExclusive)
+ )
+ .boxed()
+ .collect(Collectors.toList());
+
+ Collections.shuffle(duplicatedKeys);
+
+ return duplicatedKeys.stream().mapToInt(Integer::intValue);
+ }
+
+ /** */
+ @Test
+ public void testMultipleTableDataReplication() throws Exception {
+ String[] tableFields = new String[] {"ID BIGINT PRIMARY KEY", "NAME
VARCHAR"};
+
+ createTable("T4", tableFields, null, null, null);
+ createTable("T5", tableFields, null, null, null);
+ createTable("T6", tableFields, null, null, null);
+
+ IgniteInternalFuture<?> fut = startCdc(Stream.of("T4", "T5",
"T6").collect(Collectors.toSet()));
+
+ try {
+ String insertQry = "INSERT INTO %s VALUES(?, ?)";
+ String updateQry = "MERGE INTO %s (ID, NAME) VALUES (?, ?)";
+
+ executeOnIgnite(src, String.format(insertQry, "T4"), 1, "Name" +
1);
+
+ assertTrue(waitForCondition(waitForTableSize(postgres, "T4", 1),
getTestTimeout()));
+
+ executeOnIgnite(src, String.format(updateQry, "T4"), 1, "Name" +
2);
+ executeOnIgnite(src, String.format(insertQry, "T4"), 3, "Name" +
1);
+ executeOnIgnite(src, String.format(insertQry, "T5"), 4, "Name" +
1);
+ executeOnIgnite(src, String.format(insertQry, "T6"), 5, "Name" +
5);
+ executeOnIgnite(src, String.format(insertQry, "T6"), 6, "Name" +
6);
+ executeOnIgnite(src, String.format(updateQry, "T6"), 5, 5 +
"Name");
+
+ assertTrue(waitForCondition(waitForTableSize(postgres, "T4", 2),
getTestTimeout()));
+ assertTrue(waitForCondition(waitForTableSize(postgres, "T5", 1),
getTestTimeout()));
+ assertTrue(waitForCondition(waitForTableSize(postgres, "T6", 2),
getTestTimeout()));
+
+ assertTrue(checkRow(postgres, "T4", "NAME", "Name" + 2, "ID=1"));
+ assertTrue(checkRow(postgres, "T4", "NAME", "Name" + 1, "ID=3"));
+ assertTrue(checkRow(postgres, "T5", "NAME", "Name" + 1, "ID=4"));
+ assertTrue(checkRow(postgres, "T6", "NAME", 5 + "Name", "ID=5"));
+ assertTrue(checkRow(postgres, "T6", "NAME", "Name" + 6, "ID=6"));
+ }
+ finally {
+ fut.cancel();
+ }
+ }
+
+ /** */
+ private IgniteInternalFuture<?> startCdc(Set<String> caches) throws
IgniteInterruptedCheckedException {
+ IgniteInternalFuture<?> fut =
startIgniteToPostgreSqlCdcConsumer(src.configuration(), caches,
postgres.getPostgresDatabase());
+
+ assertTrue(waitForCondition(waitForTablesCreatedOnPostgres(postgres,
caches), getTestTimeout()));
+
+ return fut;
+ }
+
+ /** */
+ private void createTable(String tableName, String[] fields, String
constraint, String keyClsName, String valClsName) {
+ StringBuilder fieldsBldr = new StringBuilder();
+
+ A.notEmpty(fields, "Empty fields declaration.");
+
+ for (int i = 0; i < fields.length; ++i) {
+ fieldsBldr.append(fields[i]);
+
+ if (i < fields.length - 1)
+ fieldsBldr.append(",");
+ }
+
+ String constraintQry = constraint == null ? "" : ", " + constraint;
+
+ String createQry = "CREATE TABLE IF NOT EXISTS " + tableName +
+ " (" + fieldsBldr + constraintQry + ")";
+
+ String createQryWithArgs = createQry +
+ " WITH \"CACHE_NAME=" + tableName + "," +
+ (keyClsName == null ? "" : "KEY_TYPE=" + keyClsName + ",") +
+ (valClsName == null ? "" : "VALUE_TYPE=" + valClsName + ",") +
+ "ATOMICITY=" + atomicity.name() + "," +
+ "BACKUPS=" + BACKUP + "," +
+ "TEMPLATE=" + CACHE_MODE + "\";";
+
+ executeOnIgnite(src, createQryWithArgs);
+
+ if (!createTables)
+ executeOnPostgreSql(postgres, "CREATE TABLE IF NOT EXISTS " +
tableName +
+ " (" + fieldsBldr + ", version BYTEA NOT NULL" + constraintQry
+ ")");
+ }
+
+ /** */
+ @Test
+ public void testQueryEntityError() throws IgniteCheckedException {
+ QueryEntity qryOnlyValType = new QueryEntity()
+ .setTableName("qryOnlyValType")
+ .setValueType("org.apache.ignite.cdc.postgres.TestVal");
+
+ testQueryEntityReplicationError(qryOnlyValType);
+
+ QueryEntity qryOnlyValName = new QueryEntity()
+ .setTableName("qryOnlyValName")
+ .setValueFieldName("name")
+ .addQueryField("name", String.class.getName(), null)
+ .addQueryField("val", Integer.class.getName(), null);
+
+ testQueryEntityReplicationError(qryOnlyValName);
+ }
+
+ /** */
+ private void testQueryEntityReplicationError(QueryEntity qryEntity) throws
IgniteInterruptedCheckedException {
+ CacheConfiguration<Integer, TestVal> ccfg = new
CacheConfiguration<Integer, TestVal>(qryEntity.getTableName())
+ .setQueryEntities(Collections.singletonList(qryEntity));
+
+ src.getOrCreateCache(ccfg);
+
+ IgniteInternalFuture<?> fut = startIgniteToPostgreSqlCdcConsumer(
+ src.configuration(),
+ new HashSet<>(Collections.singletonList(qryEntity.getTableName())),
+ postgres.getPostgresDatabase()
+ );
+
+ waitForCondition(fut::isDone, getTestTimeout());
+
+ checkFutureEndWithError(fut);
+ }
+
+ /** */
+ protected void checkFutureEndWithError(IgniteInternalFuture<?> fut) {
+ assertTrue(fut.error() != null);
+ }
+
+ /** */
+ @Test
+ public void testQueryEntityWithKeyValueFieldNames() throws
IgniteCheckedException {
+ QueryEntity qryEntity = new QueryEntity()
+ .setTableName("qryKeyValName")
+ .setKeyFieldName("id")
+ .setValueFieldName("name")
+ .addQueryField("id", Integer.class.getName(), null)
+ .addQueryField("name", String.class.getName(), null)
+ .addQueryField("val", Integer.class.getName(), null);
+
+ int id = 2;
+
+ String name = "1";
+ int val = 3;
+
+ Function<ResultSet, Boolean> check = res -> {
+ try {
+ assertTrue(res.next());
+
+ int actId = res.getInt("id");
+ String actName = res.getString("name");
+ int actVal = res.getInt("val");
+
+ return actId == id && Objects.equals(actName, name) && actVal
== val;
+ }
+ catch (Exception e) {
+ return false;
+ }
+ };
+
+ String[] tableFields = new String[] {
+ "ID INT NOT NULL",
+ "NAME VARCHAR",
+ "VAL INT"
+ };
+
+ String constraint = "PRIMARY KEY (ID)";
+
+ testQueryEntityReplicationSuccess(qryEntity, tableFields, constraint,
id, new TestVal(name, val), check);
+ }
+
+ /** */
+ @Test
+ public void testQueryEntityWithKeyFieldsAndValName() throws
IgniteCheckedException {
+ QueryEntity qryEntity = new QueryEntity()
+ .setTableName("qryKeyFieldsValName")
+ .setKeyFields(new HashSet<>(Arrays.asList("id", "subId")))
+ .setValueFieldName("name")
+ .addQueryField("id", Integer.class.getName(), null)
+ .addQueryField("subId", String.class.getName(), null)
+ .addQueryField("name", String.class.getName(), null)
+ .addQueryField("val", Integer.class.getName(), null);
+
+ int id = 4;
+ String subId = "foobar";
+
+ String name = "5";
+ int val = 0;
+
+ Function<ResultSet, Boolean> check = res -> {
+ try {
+ assertTrue(res.next());
+
+ int actId = res.getInt("id");
+ String actSubId = res.getString("subId");
+ String actName = res.getString("name");
+ int actVal = res.getInt("val");
+
+ return actId == id && Objects.equals(actSubId, subId) &&
Objects.equals(actName, name) && actVal == val;
+ }
+ catch (Exception e) {
+ return false;
+ }
+ };
+
+ String[] tableFields = new String[] {
+ "ID INT NOT NULL",
+ "SUBID VARCHAR(15) NOT NULL",
+ "NAME VARCHAR",
+ "VAL INT"
+ };
+
+ String constraint = "PRIMARY KEY (ID, SUBID)";
+
+ testQueryEntityReplicationSuccess(
+ qryEntity,
+ tableFields,
+ constraint,
+ new TestKey(id, subId),
+ new TestVal(name, val),
+ check
+ );
+ }
+
+ /** */
+ @Test
+ public void testQueryEntityWithKeyNameValueTypeAndFields() throws
IgniteCheckedException {
+ QueryEntity qryEntity = new QueryEntity()
+ .setTableName("TESTING")
+ .setKeyFieldName("id")
+ .setValueType("org.apache.ignite.cdc.postgres.TestVal")
+ .addQueryField("id", Integer.class.getName(), null)
+ .addQueryField("name", String.class.getName(), null)
+ .addQueryField("val", Integer.class.getName(), null);
+
+ int id = 3;
+
+ String name = "test";
+ int val = 9;
+
+ Function<ResultSet, Boolean> check = res -> {
+ try {
+ assertTrue(res.next());
+
+ int actId = res.getInt("id");
+ String actName = res.getString("name");
+ int actVal = res.getInt("val");
+
+ return actId == id && Objects.equals(actName, name) && actVal
== val;
+ }
+ catch (Exception e) {
+ return false;
+ }
+ };
+
+ String[] tableFields = new String[] {
+ "ID INT NOT NULL",
+ "NAME VARCHAR",
+ "VAL INT"
+ };
+
+ String constraint = "PRIMARY KEY (ID)";
+
+ testQueryEntityReplicationSuccess(qryEntity, tableFields, constraint,
id, new TestVal(name, val), check);
+ }
+
+ /** */
+ public <K, V> void testQueryEntityReplicationSuccess(
+ QueryEntity qryEntity,
+ String[] fields,
+ String constraint,
+ K key,
+ V val,
+ Function<ResultSet, Boolean> checkTable
+ ) throws IgniteCheckedException {
+ CacheConfiguration<K, V> ccfg = new CacheConfiguration<K,
V>(qryEntity.getTableName())
+ .setQueryEntities(Collections.singletonList(qryEntity));
+
+ IgniteCache<K, V> cache = src.getOrCreateCache(ccfg);
+
+ if (!createTables) {
+ StringBuilder fieldsBldr = new StringBuilder();
+
+ A.notEmpty(fields, "Empty fields declaration.");
+
+ for (int i = 0; i < fields.length; ++i) {
+ fieldsBldr.append(fields[i]);
+
+ if (i < fields.length - 1)
+ fieldsBldr.append(",");
+ }
+
+ String constraintQry = constraint == null ? "" : ", " + constraint;
+
+ executeOnPostgreSql(postgres, "CREATE TABLE IF NOT EXISTS " +
qryEntity.getTableName() +
+ " (" + fieldsBldr + ", version BYTEA NOT NULL" + constraintQry
+ ")");
+ }
+
+ cache.put(key, val);
+
+ IgniteInternalFuture<?> fut = startCdc(new
HashSet<>(Collections.singletonList(qryEntity.getTableName())));
+
+ assertTrue(waitForCondition(waitForTableSize(postgres,
qryEntity.getTableName(), 1), getTestTimeout()));
+
+ ResultSet set = selectOnPostgreSql(postgres, "SELECT * FROM " +
qryEntity.getTableName());
+
+ assertTrue(checkTable.apply(set));
+
+ fut.cancel();
+ }
+
+ /** */
+ private static class TestKey {
+ /** */
+ private final int id;
+
+ /** */
+ private final String subId;
+
+ /** */
+ public TestKey(int id, String subId) {
+ this.id = id;
+ this.subId = subId;
+ }
+ }
+
+ /** */
+ private static class TestVal {
+ /** */
+ private final String name;
+
+ /** */
+ private final int val;
+
+ /** */
+ public TestVal(String name, int val) {
+ this.name = name;
+ this.val = val;
+ }
+ }
+}
diff --git
a/modules/cdc-ext/src/test/resources/replication/ignite-to-postgres.xml
b/modules/cdc-ext/src/test/resources/replication/ignite-to-postgres.xml
new file mode 100644
index 00000000..727194d0
--- /dev/null
+++ b/modules/cdc-ext/src/test/resources/replication/ignite-to-postgres.xml
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="grid.cfg"
class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="igniteInstanceName" value="{INSTANCE_NAME}" />
+ <property name="peerClassLoadingEnabled" value="true" />
+ <property name="localHost" value="127.0.0.1" />
+ <property name="consistentId" value="{CONSISTENT_ID}" />
+
+ <property name="dataStorageConfiguration">
+ <bean
class="org.apache.ignite.configuration.DataStorageConfiguration">
+ <property name="defaultDataRegionConfiguration">
+ <bean
class="org.apache.ignite.configuration.DataRegionConfiguration">
+ <property name="cdcEnabled" value="true" />
+ <property name="persistenceEnabled" value="true" />
+ </bean>
+ </property>
+ </bean>
+ </property>
+ </bean>
+
+ <bean id="cdc.cfg" class="org.apache.ignite.cdc.CdcConfiguration">
+ <property name="consumer">
+ <bean
class="org.apache.ignite.cdc.postgresql.IgniteToPostgreSqlCdcConsumer">
+ <property name="caches">
+ <list>
+ <value>T1</value>
+ <value>T2</value>
+ <value>T3</value>
+ <value>T4</value>
+ <value>T5</value>
+ <value>T6</value>
+ <value>qryOnlyValType</value>
+ <value>qryOnlyValName</value>
+ <value>qryKeyValName</value>
+ <value>qryKeyFieldsValName</value>
+ <value>TESTING</value>
+ </list>
+ </property>
+ <property name="batchSize" value="{BATCH_SIZE}" />
+ <property name="onlyPrimary" value="{ONLY_PRIMARY}" />
+ <property name="createTables" value="{CREATE_TABLE}" />
+ <property name="dataSource" ref="dataSource" />
+ </bean>
+ </property>
+ </bean>
+
+ <bean id="dataSource" class="org.apache.commons.dbcp2.BasicDataSource"
destroy-method="close">
+ <property name="driverClassName" value="org.postgresql.Driver"/>
+ <property name="url" value="{DB_URL}"/>
+ <property name="username" value="{DB_USER}"/>
+ <property name="password" value="{DB_PASSWORD}"/>
+
+ <!-- Optional pool configuration -->
+ <property name="initialSize" value="3"/>
+ <property name="maxTotal" value="10"/>
+ <property name="maxIdle" value="3"/>
+ <property name="minIdle" value="1"/>
+ <property name="validationQuery" value="SELECT 1"/>
+ <property name="testOnBorrow" value="true"/>
+ </bean>
+</beans>
diff --git a/parent-internal/pom.xml b/parent-internal/pom.xml
index 9d55fd78..e6f5e701 100644
--- a/parent-internal/pom.xml
+++ b/parent-internal/pom.xml
@@ -40,6 +40,8 @@
<spring61.version>6.1.15</spring61.version>
<activemq.version>5.12.0</activemq.version>
<aspectj.version>1.8.13</aspectj.version>
+ <embedded.postgres.version>2.0.3</embedded.postgres.version>
+ <commons.dbcp2.version>2.13.0</commons.dbcp2.version>
<!--
NOTE: The dependency versions below must be changed in the release
branch up to