This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 12772a17 IGNITE-19528 Add CDC examples with cdc-start-up.sh (#285)
12772a17 is described below
commit 12772a170818dbcd67e70503c3640baeb9a2724b
Author: Maksim Davydov <[email protected]>
AuthorDate: Thu Mar 27 18:50:58 2025 +0300
IGNITE-19528 Add CDC examples with cdc-start-up.sh (#285)
---
modules/cdc-ext/README.md | 31 ++
modules/cdc-ext/assembly/cdc-ext.xml | 5 +
.../cdc-ext/examples/cdc-start-up/cdc-start-up.sh | 539 +++++++++++++++++++++
.../config/cdc-start-up/cdc-base-configuration.xml | 83 ++++
.../config/cdc-start-up/cdc-streamer-I2I-thin.xml | 27 ++
.../config/cdc-start-up/cdc-streamer-I2I.xml | 42 ++
.../config/cdc-start-up/cdc-streamer-I2K.xml | 27 ++
.../config/cdc-start-up/cdc-streamer-K2I-thin.xml | 33 ++
.../config/cdc-start-up/cdc-streamer-K2I.xml | 47 ++
.../cdc-start-up/cluster-1/ignite-cdc.properties | 28 ++
.../cluster-1/kafka-to-ignite.properties | 5 +
.../config/cdc-start-up/cluster-1/kafka.properties | 2 +
.../cdc-start-up/cluster-2/ignite-cdc.properties | 28 ++
.../cluster-2/kafka-to-ignite.properties | 5 +
.../config/cdc-start-up/cluster-2/kafka.properties | 2 +
.../cdc-start-up/cluster-3/ignite-cdc.properties | 28 ++
.../cluster-3/kafka-to-ignite.properties | 5 +
.../config/cdc-start-up/cluster-3/kafka.properties | 2 +
18 files changed, 939 insertions(+)
diff --git a/modules/cdc-ext/README.md b/modules/cdc-ext/README.md
new file mode 100644
index 00000000..3aa88d72
--- /dev/null
+++ b/modules/cdc-ext/README.md
@@ -0,0 +1,31 @@
+Apache Ignite Change Data Capture Module
+------------------------
+
+Apache Ignite CDC is a data processing pattern used to asynchronously receive
entries that have been changed on the local node so that action can be taken
using the changed entry.
+
+This module provides the clients with simple CDC implementations strategies
for inter-cluster communication. Active-Passive and Active-Active replication
strategies can be established with different CDC clients. Each such client
should be started up for each node participating in CDC.
+
+==== Installation
+
+. Build `cdc-ext` module with maven:
++
+```console
+ $~/src/ignite-extensions/> mvn clean install -f modules/cdc-ext
-Pcheckstyle,extension-release,skip-docs -DskipTests
+ $~/src/ignite-extensions/> ls modules/cdc-ext/target | grep zip
+ignite-cdc-ext-bin.zip
+```
+
+. The resulting binary will be located under 'target' directory. Unpack
`ignite-cdc-ext-bin.zip` archive to `$IGNITE_HOME` folder to enable CDC.
+
+For Linux/Macos you can use in the ignite root with cdc binary:
++
+```console
+$ unzip ignite-cdc-ext-bin.zip
+$ cp -r ignite-cdc-ext/* .
+```
+
+Now, you have additional binary `$IGNITE_HOME/bin/kafka-to-ignite.sh`,
`$IGNITE_HOME/libs/optional/ignite-cdc-ext` module and configuration examples
under `$IGNITE_HOME/examples/config/cdc-start-up`
+
+To run the examples, use the binary
`$IGNITE_HOME/examples/cdc-start-up/cdc-start-up.sh`
+
+Use Apache Ignite documentation to explore CDC capabilities.
diff --git a/modules/cdc-ext/assembly/cdc-ext.xml
b/modules/cdc-ext/assembly/cdc-ext.xml
index fcbeb587..18c33476 100644
--- a/modules/cdc-ext/assembly/cdc-ext.xml
+++ b/modules/cdc-ext/assembly/cdc-ext.xml
@@ -38,5 +38,10 @@
<outputDirectory>${project.artifactId}/bin</outputDirectory>
<fileMode>755</fileMode>
</fileSet>
+ <fileSet>
+ <directory>${basedir}/examples</directory>
+ <outputDirectory>${project.artifactId}/examples</outputDirectory>
+ <fileMode>755</fileMode>
+ </fileSet>
</fileSets>
</assembly>
diff --git a/modules/cdc-ext/examples/cdc-start-up/cdc-start-up.sh
b/modules/cdc-ext/examples/cdc-start-up/cdc-start-up.sh
new file mode 100644
index 00000000..502677fd
--- /dev/null
+++ b/modules/cdc-ext/examples/cdc-start-up/cdc-start-up.sh
@@ -0,0 +1,539 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# All-in-one CDC start-up manager. Use this script to run examples for CDC
with Apache Ignite.
+#
+
+set -Eeuo pipefail
+trap 'cleanup $LINENO' SIGINT SIGTERM ERR EXIT
+
+SCRIPT_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd -P)
+
+IGNITE_BIN_DIR="${SCRIPT_DIR}/../../bin"
+IGNITE_HOME="${SCRIPT_DIR}/../../"
+IGNITE_LIBS="${SCRIPT_DIR}/../../libs"
+IGNITE_CDC_EXAMPLES_DIR="${SCRIPT_DIR}/../config/cdc-start-up"
+
+CURRENT_PID=$$
+
+#
+# Help message
+#
+usage() {
+ cat <<EOF
+This is a simple start-up script designed to ease the user experience with
starting CDC for ignite clusters.
+
+Available options:
+
+-h, --help Prints help summary
+-i, --ignite igniteProperties Starts a single node with
provided properties. `
+ `An ignite instance will be started with
basic CDC configuration `
+
`\$IGNITE_HOME/examples/config/cdc-start-up/cdc-base-configuration.xml
+
+ Available options for igniteProperties include:
+ * cluster-1
+ * cluster-2
+
+ Properties files are preconfigured for data replication between cluster-1
and cluster-2.
+
+-c, --ignite-cdc consumerMode igniteProperties `
+ `Starts CDC consumer with specified
transfer mode to parse WAL archives `
+ `from source cluster. igniteProperties
is used to determine the source cluster.
+
+ Available options for --ignite-cdc include:
+ * ignite-to-ignite-thick Creates a single thick client, `
+ `used to transfer data from
source-cluster to destination-cluster.
+ * ignite-to-ignite-thin Creates a single thin client, `
+ `used to transfer data from
source-cluster to destination-cluster.
+ * ignite-to-kafka Creates a cdc consumer, used to
transfer data from source-cluster to specified Kafka topic.
+
+-k, --kafka-to-ignite clientMode igniteProperties `
+ `Starts Kafka topic consumer for data
replication to destination cluster.
+
+ Available options for --kafka-to-ignite include:
+ * thick Creates a single thick client,
used to transfer data from Kafka to destination-cluster.
+ * thin Creates a single thin client,
used to transfer data from Kafka to destination-cluster.
+
+--check-cdc --key keyVal --value jsonVal [--cluster clusterNum] `
+ `Starts CDC check with proposed
(key, value) entry. `
+ `The command puts the entry in the
chosen cluster, and shows the comparison `
+ `of caches between clusters as the
entry reaches the other cluster.
+
+ Options:
+ * --key keyVal Specifies key of the entry.
+ * --value jsonVal Specifies value of the entry as
JSON. Example: '{"val": "val", "ver": "XXX"}'
+ * --cluster clusterNum Optional parameter for the
cluster number (1 or 2) that initially stores the entry. `
+ `The default value is 1.
+EOF
+ exit
+}
+
+#
+# General message output function
+# Arguments:
+# 1 - message to print
+#
+msg() {
+ echo >&2 -e "${ORANGE}[PID=${CURRENT_PID-}]:${NOFORMAT} ${1-}"
+}
+
+#
+# General message output function
+# Arguments:
+# 1 - entity for column 1
+# 1 - entity for column 2
+#
+msgPrintf() {
+ local val1="$1"
+ local val2="$2"
+
+ printf >&2 "${ORANGE}[PID=%s]:${NOFORMAT} | %-16s | %-16s |\n"
"${CURRENT_PID-$$}" "${val1}" "${val2}"
+}
+
+#
+# Exits with error
+#
+die() {
+ local msg=$1
+ local code=${2-1}
+ msg "$msg"
+ exit "$code"
+}
+
+#
+# Colors setup function
+#
+setupColors() {
+ if [[ -t 2 ]] && [[ -z "${NO_COLOR-}" ]] && [[ "${TERM-}" != "dumb" ]];
then
+ NOFORMAT='\033[0m' RED='\033[0;31m' GREEN='\033[0;32m'
ORANGE='\033[0;33m'
+ BLUE='\033[0;34m' PURPLE='\033[0;35m' CYAN='\033[0;36m'
YELLOW='\033[1;33m'
+ else
+ NOFORMAT='' RED='' GREEN='' ORANGE='' BLUE='' PURPLE='' CYAN='' YELLOW=''
+ fi
+}
+
+#
+# Script setup. Sets colors for messaging.
+#
+setup() {
+ setupColors
+
+ msg "${PURPLE}CDC start-up manager [PID=${CURRENT_PID-}]${NOFORMAT}"
+}
+
+#
+# Clean-up function for exit and interruption
+#
+cleanup() {
+ trap - SIGINT SIGTERM ERR EXIT
+
+ msg "${PURPLE}CDC start-up manager [PID=${CURRENT_PID-}] is closed
${NOFORMAT}"
+}
+
+#
+# General information message output function
+# Arguments:
+# 1 - message to print
+#
+infoMsg() {
+ msg "${GREEN}${1-}${NOFORMAT}"
+}
+
+#
+# Simple util function to check argument presence for specified parent argument
+# Arguments:
+# 1 - parent command argument
+# 2 - argument name to check
+# 3 - argument to check
+#
+checkMissing() {
+ local parent_arg_name=$1
+ local arg_name=$2
+
+ local arg_to_check=$3
+
+ [[ -z $arg_to_check ]] && die "Missing script argument
[""${arg_name-}""] for ""${parent_arg_name-}""!"
+
+ return 0
+}
+
+#
+# Checks --ignite arguments
+# Globals:
+# ignite_properties_path - '.properties' holder path. The file is used to
configure server node
+# Arguments:
+# "$@" - script command arguments
+#
+checkServerParams() {
+ checkMissing "${script_param-}" "igniteProperties" "${2-}"
+
+ ignite_properties_path="${IGNITE_CDC_EXAMPLES_DIR}"/${2-}
+}
+
+#
+# Checks --ignite-cdc arguments
+# Globals:
+# consumer_mode - Transfer type for CDC
+# cdc_streamer_xml_file_name - '.xml' filename of the specified transfer type
+# ignite_properties_path - '.properties' holder path. The file is used to
configure CDC consumer
+# Arguments:
+# "$@" - script command arguments
+#
+checkConsumerParams() {
+ consumer_mode=${2-}
+
+ checkMissing "${script_param-}" "consumerMode" "${consumer_mode-}"
+
+ case $consumer_mode in
+ ignite-to-ignite-thick) export
cdc_streamer_xml_file_name="cdc-streamer-I2I.xml" ;;
+ ignite-to-ignite-thin) export
cdc_streamer_xml_file_name="cdc-streamer-I2I-thin.xml" ;;
+ ignite-to-kafka) export
cdc_streamer_xml_file_name="cdc-streamer-I2K.xml" ;;
+ *) die "Unknown consumer mode for CDC: ${consumer_mode-}" ;;
+ esac
+
+ checkMissing "${consumer_mode-}" "igniteProperties" "${3-}"
+
+ ignite_properties_path="${IGNITE_CDC_EXAMPLES_DIR}"/${3-}
+
+ return 0
+}
+
+#
+# Checks --kafka-to-ignite arguments
+# Globals:
+# client_mode - Transfer type for CDC
+# cdc_streamer_xml_file_name - '.xml' filename of the specified transfer type
+# ignite_properties_path - '.properties' holder path. The file is used to
configure CDC client
+# Arguments:
+# "$@" - script command arguments
+#
+checkKafkaConsumerParams() {
+ client_mode=${2-}
+
+ checkMissing "${script_param-}" "clientMode" "${client_mode-}"
+
+ case $client_mode in
+ thick) ;;
+ thin) ;;
+ *) die "Unknown client mode for CDC: ${client_mode-}" ;;
+ esac
+
+ checkMissing "${client_mode-}" "igniteProperties" "${3-}"
+
+ ignite_properties_path="${IGNITE_CDC_EXAMPLES_DIR}"/${3-}
+
+ return 0
+}
+
+#
+# Checks --check-cdc arguments
+# Globals:
+# key - Entity key
+# value - Entity value in JSON
+# cluster - Cluster to put entity in
+# Arguments:
+# "$@" - script command arguments
+#
+checkEntriesParams() {
+ cluster=1
+ local jsonValue=""
+
+ while :; do
+ case "${2-}" in
+ --key) key="${3-}"; shift ;;
+ --value) jsonValue="${3-}"; shift ;;
+ --cluster) cluster="${3-}"; shift ;;
+ -?*) die "Unknown option: ${script_param-}" ;;
+ *) break ;;
+ esac
+ shift
+ done
+
+ checkMissing "${script_param-}" "key" "${key-}"
+ checkMissing "${script_param-}" "value" "${jsonValue-}"
+
+ value=$(echo $jsonValue | awk -F'"val": ' '{ print $2 }' | awk -F',' '{
print $1 }' | awk -F'}' '{print $1}')
+ version=$(echo $jsonValue | awk -F'"ver": ' '{ print $2 }' | awk -F'}'
'{ print $1 }')
+
+ checkMissing "${jsonValue}" "val field" "${value-}"
+
+ return 0
+}
+
+#
+# Checks if all required optional libraries enabled for CDC check
+#
+checkLibraries() {
+ local lib1="ignite-rest-http";
+ local lib2="ignite-json";
+
+ if [ ! -d "$IGNITE_LIBS/$lib1" ] && [ ! -d "$IGNITE_LIBS/$lib2" ]; then
+ die "${RED}Failure! Check that $lib1 and $lib2 optional libraries are
enabled.";
+ elif [ ! -d "$IGNITE_LIBS/$lib1" ]; then
+ die "${RED}Failure! Check that $lib1 optional library is enabled.";
+ elif [ ! -d "$IGNITE_LIBS/$lib2" ]; then
+ die "${RED}Failure! Check that $lib2 optional library is enabled.";
+ fi
+}
+
+#
+# Starts single Ignite instance
+# cdc-base-configuration needs dummy streamer to start, which is why
cdc_streamer_xml_file_name is exported
+#
+startIgnite() {
+ infoMsg "Starting Ignite for ${ignite_properties_path-}"
+
+ export cdc_streamer_xml_file_name="cdc-streamer-I2I.xml"
+ export ignite_properties_path
+
+ "${IGNITE_BIN_DIR}"/ignite.sh -v
"${IGNITE_CDC_EXAMPLES_DIR}"/cdc-base-configuration.xml
+}
+
+#
+# Starts single CDC consumer instance
+#
+startCdcConsumer() {
+ infoMsg "Starting CDC consumer for ${ignite_properties_path-} with
${consumer_mode-}"
+
+ export ignite_properties_path
+ export IGNITE_HOME
+
+ source "${IGNITE_BIN_DIR}"/ignite-cdc.sh -v
"${IGNITE_CDC_EXAMPLES_DIR}"/cdc-base-configuration.xml ;
+}
+
+#
+# Starts single Kafka consumer instance
+#
+startKafkaConsumer() {
+ infoMsg "Starting Kafka topic consumer for ${ignite_properties_path-}
with ${client_mode-}"
+
+ export ignite_properties_path
+ export IGNITE_HOME
+
+ case $client_mode in
+ thick) source "${IGNITE_BIN_DIR}"/kafka-to-ignite.sh -v
"${IGNITE_CDC_EXAMPLES_DIR}"/cdc-streamer-K2I.xml ;;
+ thin) source "${IGNITE_BIN_DIR}"/kafka-to-ignite.sh -v
"${IGNITE_CDC_EXAMPLES_DIR}"/cdc-streamer-K2I-thin.xml ;;
+ *) die "Unknown client mode for CDC: ${client_mode-}" ;;
+ esac
+}
+
+#
+# Prints delimiter
+#
+printLine() {
+ msg "+------------------+------------------+"
+}
+
+#
+# Prints 'cluster' line
+#
+printClusterMsg() {
+ msgPrintf "cluster-1" "cluster-2"
+}
+
+#
+# Fetches values from clusters
+# Globals:
+# value1 - Entry value from cluster 1
+# version1 - Entry version from cluster 1
+# value2 - Entry value from cluster 2
+# version2 - Entry version from cluster 2
+#
+updateValues() {
+ local json1=$(curl -s
'http://localhost:8080/ignite?cmd=get&key='$key'&cacheName=terminator&keyType=String&valueType=IgniteBiTuple'
-X GET -H 'Content-Type: application/x-www-form-urlencoded')
+ local json2=$(curl -s
'http://localhost:8081/ignite?cmd=get&key='$key'&cacheName=terminator&keyType=String&valueType=IgniteBiTuple'
-X GET -H 'Content-Type: application/x-www-form-urlencoded')
+
+ value1=$(echo $json1 | awk -F'val1":' '{ print $2 }' | awk -F',' '{ print $1
}' | awk -F'}' '{print $1}')
+ version1=$(echo $json1 | awk -F'val2":' '{ print $2 }' | awk -F',' '{ print
$1 }' | awk -F'}' '{print $1}')
+
+ value2=$(echo $json2 | awk -F'val1":' '{ print $2 }' | awk -F',' '{ print $1
}' | awk -F'}' '{print $1}')
+ version2=$(echo $json2 | awk -F'val2":' '{ print $2 }' | awk -F',' '{ print
$1 }' | awk -F'}' '{print $1}')
+
+ if [[ -z $value1 || $value1 == "" ]]; then
+ value1="-"; version1="-";
+ fi
+
+ if [[ -z $value2 || $value2 == "" ]]; then
+ value2="-"; version2="-";
+ fi
+}
+
+#
+# Prints the values from clusters for specified key
+#
+printValuesPair() {
+ updateValues
+
+ msgPrintf "val: ${value1-}" "val: ${value2-}"
+ msgPrintf "ver: ${version1-}" "ver: ${version2-}"
+}
+
+#
+# Use a function to construct the data payload based on whether version is set
+#
+constructPayload() {
+ local value=$1
+ local version=$2
+ local payload="%7B%22val1%22%3A$value" # always include val1
+
+ if [[ -n "$version" ]]; then # Check if version is not empty
+ payload="${payload}%2C%22val2%22%3A$version" # Add val2 if version is set
+ fi
+
+ payload="${payload}%7D" # close the json
+ echo "$payload"
+}
+
+#
+# Determine the correct port based on the cluster value
+#
+getPort() {
+ if [[ "$cluster" == "1" ]]; then
+ echo "8080"
+ else
+ echo "8081"
+ fi
+}
+
+#
+# Puts the entry in the specified cluster
+#
+pushEntry() {
+ msg "Pushing entry..."
+
+ local result
+
+ # Main logic
+ payload=$(constructPayload "$value" "$version")
+ port=$(getPort)
+
+
url="http://localhost:${port}/ignite?cmd=put&key=${key}&val=$payload&cacheName=terminator&keyType=String&valueType=IgniteBiTuple"
+
+ result=$(curl -s "$url" -X POST -H 'Content-Type:
application/x-www-form-urlencoded')
+
+ local successStatus=$(echo $result | awk -F'successStatus":' '{ print
$2 }' | awk -F',' '{ print $1 }' | awk -F'}' '{print $1}')
+ local errorMsg=$(echo $result | awk -F'"error":"'
'{gsub(/\\n.*/,"",$2); gsub(/.*reason=/,"",$2); print $2}')
+
+ if ((successStatus > 0)); then
+ msg ""; die "${RED}${errorMsg}${NOFORMAT}";
+ fi
+
+ msg "Success"
+ msg ""
+}
+
+#
+# Prints the values from clusters for specified key and increments counter for
CDC check
+#
+iterateValuesCheck() {
+ printValuesPair
+
+ printLine
+
+ current_time_s=$(date +%s)
+}
+
+#
+# Trails clusters entries for specified key
+#
+printValuesUntilSuccess() {
+ local start_time_s=$(date +%s)
+ declare current_time_s=$(date +%s)
+
+ while iterateValuesCheck; ([[ $value1 != $value2 || $version1 !=
$version2 ]]) && ((current_time_s - start_time_s <= 60)); do
+ sleep 1
+ done
+
+ if ((current_time_s - start_time_s > 60)); then
+ msg ""; die "${RED}Replication timed out! Check CDC cycle${NOFORMAT}";
+ fi
+
+ msg ""
+ msg "Success"
+}
+
+#
+# General function to check CDC entry transport
+#
+performCheck() {
+ infoMsg "CDC check started"
+ msg ""
+
+ printLine
+ printClusterMsg
+ printLine
+ printValuesPair
+ printLine
+ msg ""
+
+ pushEntry
+
+ printLine
+ printClusterMsg
+ printLine
+ printValuesUntilSuccess
+
+ return 0
+}
+
+#
+# Main function to parse commands arguments
+#
+parseParams() {
+ script_param=${1-}
+
+ [[ -z $script_param ]] && die "Missing script parameter! Use --help to
see available options."
+
+ case $script_param in
+ -i | --ignite)
+ checkServerParams "$@"
+ checkLibraries
+ startIgnite
+ ;;
+ -c | --ignite-cdc)
+ checkConsumerParams "$@"
+ checkLibraries
+ startCdcConsumer
+ ;;
+ -k | --kafka-to-ignite)
+ checkKafkaConsumerParams "$@"
+ checkLibraries
+ startKafkaConsumer
+ ;;
+ --check-cdc)
+ checkEntriesParams "$@"
+ checkLibraries
+ performCheck
+ ;;
+ -h | --help) usage ;;
+ -?*) die "Unknown option: ${script_param-}" ;;
+ *) die "Unknown input: ${script_param-}" ;;
+ esac
+
+ return 0
+}
+
+#
+# Script Main Body
+#
+
+setup
+
+parseParams "$@"
diff --git
a/modules/cdc-ext/examples/config/cdc-start-up/cdc-base-configuration.xml
b/modules/cdc-ext/examples/config/cdc-start-up/cdc-base-configuration.xml
new file mode 100644
index 00000000..cfcbad24
--- /dev/null
+++ b/modules/cdc-ext/examples/config/cdc-start-up/cdc-base-configuration.xml
@@ -0,0 +1,83 @@
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xmlns:context="http://www.springframework.org/schema/context"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd
+ http://www.springframework.org/schema/context
+
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
+
+ <import resource="${cdc_streamer_xml_file_name}"/>
+
+ <context:property-placeholder
location="file:${ignite_properties_path}/ignite-cdc.properties"/>
+
+ <!--IgniteConfiguration for source cluster connection-->
+ <bean class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="igniteInstanceName" value="${server_instance_name}"/>
+ <property name="consistentId" value="${server_instance_name}"/>
+ <property name="localHost" value="${server_host}"/>
+ <property name="discoverySpi" ref="ignTcpDiscoverySpi"/>
+ <property name="dataStorageConfiguration"
ref="ignDataStorageConfiguration"/>
+ <property name="pluginProviders"
ref="ignCacheVersionConflictResolverPluginProvider"/>
+ <property name="cacheConfiguration" ref="ignCacheConfiguration"/>
+ <property name="clientConnectorConfiguration"
ref="ignClientConnectorConfiguration"/>
+ </bean>
+
+ <!--CdcConfiguration-->
+ <bean id="cdc.cfg" class="org.apache.ignite.cdc.CdcConfiguration">
+ <property name="consumer" ref="cdc.streamer"/>
+ </bean>
+
+ <!--IgniteConfiguration property declaration-->
+
+ <!--TcpDiscoverySpi-->
+ <bean id="ignTcpDiscoverySpi"
class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <property name="addresses"
value="${server_host}:${server_port_range}"/>
+ </bean>
+ </property>
+
+ <property name="localPort" value="${server_local_port}"/>
+ <property name="joinTimeout" value="10000"/>
+ </bean>
+
+ <!--DataStorageConfiguration-->
+ <bean id="ignDataStorageConfiguration"
class="org.apache.ignite.configuration.DataStorageConfiguration">
+ <property name="defaultDataRegionConfiguration">
+ <bean
class="org.apache.ignite.configuration.DataRegionConfiguration">
+ <property name="cdcEnabled" value="true"/>
+ </bean>
+ </property>
+
+ <property name="walForceArchiveTimeout" value="5000"/>
+ </bean>
+
+ <!--CacheVersionConflictResolverPluginProvider-->
+ <bean id="ignCacheVersionConflictResolverPluginProvider"
class="org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider">
+ <property name="caches">
+ <set>
+ <value>terminator</value>
+ </set>
+ </property>
+
+ <property name="clusterId" value="${cluster_id}"/>
+ <property name="conflictResolveField" value="val2"/>
+ </bean>
+
+ <!--CacheConfiguration-->
+ <util:list id="ignCacheConfiguration"
value-type="org.apache.ignite.configuration.CacheConfiguration">
+ <bean class="org.apache.ignite.configuration.CacheConfiguration">
+ <property name="atomicityMode" value="ATOMIC"/>
+ <property name="name" value="terminator"/>
+ </bean>
+ </util:list>
+
+ <!--ClientConnectorConfiguration-->
+ <bean id="ignClientConnectorConfiguration"
class="org.apache.ignite.configuration.ClientConnectorConfiguration">
+ <property name="port" value="${server_client_connector_port}"/>
+ </bean>
+</beans>
diff --git
a/modules/cdc-ext/examples/config/cdc-start-up/cdc-streamer-I2I-thin.xml
b/modules/cdc-ext/examples/config/cdc-start-up/cdc-streamer-I2I-thin.xml
new file mode 100644
index 00000000..b831824e
--- /dev/null
+++ b/modules/cdc-ext/examples/config/cdc-start-up/cdc-streamer-I2I-thin.xml
@@ -0,0 +1,27 @@
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+
+ <!--IgniteToIgniteClientCdcStreamer-->
+ <bean id="cdc.streamer"
class="org.apache.ignite.cdc.thin.IgniteToIgniteClientCdcStreamer">
+ <property name="destinationClientConfiguration">
+ <bean class="org.apache.ignite.configuration.ClientConfiguration">
+ <property name="addresses"
value="${destination_host}:${destination_client_connector_port}"/>
+ </bean>
+ </property>
+
+ <property name="caches">
+ <list>
+ <value>terminator</value>
+ </list>
+ </property>
+
+ <property name="onlyPrimary" value="false"/>
+ <property name="maxBatchSize" value="1024"/>
+ </bean>
+</beans>
diff --git a/modules/cdc-ext/examples/config/cdc-start-up/cdc-streamer-I2I.xml
b/modules/cdc-ext/examples/config/cdc-start-up/cdc-streamer-I2I.xml
new file mode 100644
index 00000000..af7afa8c
--- /dev/null
+++ b/modules/cdc-ext/examples/config/cdc-start-up/cdc-streamer-I2I.xml
@@ -0,0 +1,42 @@
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+
+ <!--IgniteToIgniteCdcStreamer-->
+ <bean id="cdc.streamer"
class="org.apache.ignite.cdc.IgniteToIgniteCdcStreamer">
+ <property name="destinationIgniteConfiguration">
+ <bean class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="igniteInstanceName"
value="${cdc_client_instance_name}"/>
+ <property name="clientMode" value="true"/>
+ <property name="localHost" value="${server_host}"/>
+ <property name="discoverySpi"
ref="destination.TcpDiscoverySpi"/>
+ </bean>
+ </property>
+
+ <property name="caches">
+ <list>
+ <value>terminator</value>
+ </list>
+ </property>
+
+ <property name="onlyPrimary" value="false"/>
+ <property name="maxBatchSize" value="1024"/>
+ </bean>
+
+ <!--Destination TcpDiscoverySpi-->
+ <bean id="destination.TcpDiscoverySpi"
class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <property name="addresses"
value="${destination_host}:${destination_port_range}"/>
+ </bean>
+ </property>
+
+ <property name="localPort" value="${destination_local_port}"/>
+ <property name="joinTimeout" value="10000"/>
+ </bean>
+</beans>
diff --git a/modules/cdc-ext/examples/config/cdc-start-up/cdc-streamer-I2K.xml
b/modules/cdc-ext/examples/config/cdc-start-up/cdc-streamer-I2K.xml
new file mode 100644
index 00000000..b55d58fc
--- /dev/null
+++ b/modules/cdc-ext/examples/config/cdc-start-up/cdc-streamer-I2K.xml
@@ -0,0 +1,27 @@
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd">
+
+ <!--Kafka properties-->
+ <util:properties id="kafkaProperties"
location="file:${ignite_properties_path}/kafka.properties"/>
+
+ <!--IgniteToKafkaCdcStreamer-->
+ <bean id="cdc.streamer"
class="org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer">
+ <property name="topic" value="${send_data_kafka_topic_name}"/>
+ <property name="metadataTopic"
value="${send_metadata_kafka_topic_name}"/>
+ <property name="kafkaPartitions" value="${send_kafka_partitions}"/>
+ <property name="caches">
+ <list>
+ <value>terminator</value>
+ </list>
+ </property>
+ <property name="maxBatchSize" value="1024"/>
+ <property name="onlyPrimary" value="false"/>
+ <property name="kafkaProperties" ref="kafkaProperties"/>
+ </bean>
+</beans>
diff --git
a/modules/cdc-ext/examples/config/cdc-start-up/cdc-streamer-K2I-thin.xml
b/modules/cdc-ext/examples/config/cdc-start-up/cdc-streamer-K2I-thin.xml
new file mode 100644
index 00000000..e4f639e4
--- /dev/null
+++ b/modules/cdc-ext/examples/config/cdc-start-up/cdc-streamer-K2I-thin.xml
@@ -0,0 +1,33 @@
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xmlns:context="http://www.springframework.org/schema/context"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd
+ http://www.springframework.org/schema/context
+
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
+
+ <context:property-placeholder
location="file:${ignite_properties_path}/ignite-cdc.properties"/>
+
+ <util:properties id="kafkaProperties"
location="file:${ignite_properties_path}/kafka-to-ignite.properties"/>
+
+ <bean id="streamer.cfg"
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+ <property name="topic" value="${receive_data_kafka_topic_name}"/>
+ <property name="metadataTopic"
value="${receive_metadata_kafka_topic_name}"/>
+ <property name="kafkaPartsFrom"
value="${receive_kafka_partitions_from}"/>
+ <property name="kafkaPartsTo" value="${receive_kafka_partitions_to}"/>
+ <property name="threadCount"
value="${receive_data_process_thread_number}"/>
+ <property name="caches">
+ <list>
+ <value>terminator</value>
+ </list>
+ </property>
+ </bean>
+
+ <bean id="client.cfg"
class="org.apache.ignite.configuration.ClientConfiguration">
+ <property name="addresses"
value="${server_host}:${server_client_connector_port}"/>
+ </bean>
+</beans>
diff --git a/modules/cdc-ext/examples/config/cdc-start-up/cdc-streamer-K2I.xml
b/modules/cdc-ext/examples/config/cdc-start-up/cdc-streamer-K2I.xml
new file mode 100644
index 00000000..6d3f8689
--- /dev/null
+++ b/modules/cdc-ext/examples/config/cdc-start-up/cdc-streamer-K2I.xml
@@ -0,0 +1,47 @@
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:util="http://www.springframework.org/schema/util"
+ xmlns:context="http://www.springframework.org/schema/context"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util
+ http://www.springframework.org/schema/util/spring-util.xsd
+ http://www.springframework.org/schema/context
+
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
+
+ <context:property-placeholder
location="file:${ignite_properties_path}/ignite-cdc.properties"/>
+
+ <util:properties id="kafkaProperties"
location="file:${ignite_properties_path}/kafka-to-ignite.properties"/>
+
+ <bean id="streamer.cfg"
class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+ <property name="topic" value="${receive_data_kafka_topic_name}"/>
+ <property name="metadataTopic"
value="${receive_metadata_kafka_topic_name}"/>
+ <property name="kafkaPartsFrom"
value="${receive_kafka_partitions_from}"/>
+ <property name="kafkaPartsTo" value="${receive_kafka_partitions_to}"/>
+ <property name="threadCount"
value="${receive_data_process_thread_number}"/>
+ <property name="caches">
+ <list>
+ <value>terminator</value>
+ </list>
+ </property>
+ </bean>
+
+ <bean id="ignIgniteConfiguration"
class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="discoverySpi" ref="ignTcpDiscoverySpi"/>
+ <property name="clientMode" value="true"/>
+ <property name="consistentId" value="${cdc_client_instance_name}"/>
+ </bean>
+
+ <!--TcpDiscoverySpi-->
+ <bean id="ignTcpDiscoverySpi"
class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+ <property name="ipFinder">
+ <bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+ <property name="addresses"
value="${server_host}:${server_port_range}"/>
+ </bean>
+ </property>
+
+ <property name="localPort" value="${server_local_port}"/>
+ <property name="joinTimeout" value="10000"/>
+ </bean>
+</beans>
diff --git
a/modules/cdc-ext/examples/config/cdc-start-up/cluster-1/ignite-cdc.properties
b/modules/cdc-ext/examples/config/cdc-start-up/cluster-1/ignite-cdc.properties
new file mode 100644
index 00000000..fc972cdd
--- /dev/null
+++
b/modules/cdc-ext/examples/config/cdc-start-up/cluster-1/ignite-cdc.properties
@@ -0,0 +1,28 @@
+# Host server configuration
+server_instance_name=server-1
+server_host=127.0.0.1
+server_local_port=47500
+server_port_range=47500..47510
+server_client_connector_port=10800
+cluster_id=1
+
+# Receiver server configuration
+destination_host=127.0.0.1
+destination_local_port=47600
+destination_port_range=47600..47610
+destination_client_connector_port=10850
+
+# General CDC consumer configuration
+cdc_client_instance_name=cdc-streamer-from-1-to-2
+
+# Kafka CDC consumer configuration
+send_data_kafka_topic_name=dc1_to_dc2
+send_metadata_kafka_topic_name=metadata_from_dc1
+send_kafka_partitions=16
+
+# Kafka CDC receiver configuration
+receive_data_kafka_topic_name=dc2_to_dc1
+receive_metadata_kafka_topic_name=metadata_from_dc2
+receive_kafka_partitions_from=0
+receive_kafka_partitions_to=16
+receive_data_process_thread_number=4
\ No newline at end of file
diff --git
a/modules/cdc-ext/examples/config/cdc-start-up/cluster-1/kafka-to-ignite.properties
b/modules/cdc-ext/examples/config/cdc-start-up/cluster-1/kafka-to-ignite.properties
new file mode 100644
index 00000000..bbdb4fe0
--- /dev/null
+++
b/modules/cdc-ext/examples/config/cdc-start-up/cluster-1/kafka-to-ignite.properties
@@ -0,0 +1,5 @@
+bootstrap.servers=127.0.0.1:9092
+request.timeout.ms=10000
+group.id=kafka-to-ignite-dc1
+auto.offset.reset=earliest
+enable.auto.commit=false
\ No newline at end of file
diff --git
a/modules/cdc-ext/examples/config/cdc-start-up/cluster-1/kafka.properties
b/modules/cdc-ext/examples/config/cdc-start-up/cluster-1/kafka.properties
new file mode 100644
index 00000000..11af0ce8
--- /dev/null
+++ b/modules/cdc-ext/examples/config/cdc-start-up/cluster-1/kafka.properties
@@ -0,0 +1,2 @@
+bootstrap.servers=127.0.0.1:9092
+request.timeout.ms=10000
\ No newline at end of file
diff --git
a/modules/cdc-ext/examples/config/cdc-start-up/cluster-2/ignite-cdc.properties
b/modules/cdc-ext/examples/config/cdc-start-up/cluster-2/ignite-cdc.properties
new file mode 100644
index 00000000..61b55e17
--- /dev/null
+++
b/modules/cdc-ext/examples/config/cdc-start-up/cluster-2/ignite-cdc.properties
@@ -0,0 +1,28 @@
+# Host server configuration
+server_instance_name=server-2
+server_host=127.0.0.1
+server_local_port=47600
+server_port_range=47600..47610
+server_client_connector_port=10850
+cluster_id=2
+
+# Receiver server configuration
+destination_host=127.0.0.1
+destination_local_port=47500
+destination_port_range=47500..47510
+destination_client_connector_port=10800
+
+# General CDC consumer configuration
+cdc_client_instance_name=cdc-streamer-from-2-to-1
+
+# Kafka CDC consumer configuration
+send_data_kafka_topic_name=dc2_to_dc1
+send_metadata_kafka_topic_name=metadata_from_dc2
+send_kafka_partitions=16
+
+# Kafka CDC receiver configuration
+receive_data_kafka_topic_name=dc1_to_dc2
+receive_metadata_kafka_topic_name=metadata_from_dc1
+receive_kafka_partitions_from=0
+receive_kafka_partitions_to=16
+receive_data_process_thread_number=4
\ No newline at end of file
diff --git
a/modules/cdc-ext/examples/config/cdc-start-up/cluster-2/kafka-to-ignite.properties
b/modules/cdc-ext/examples/config/cdc-start-up/cluster-2/kafka-to-ignite.properties
new file mode 100644
index 00000000..20ffb034
--- /dev/null
+++
b/modules/cdc-ext/examples/config/cdc-start-up/cluster-2/kafka-to-ignite.properties
@@ -0,0 +1,5 @@
+bootstrap.servers=127.0.0.1:9092
+request.timeout.ms=10000
+group.id=kafka-to-ignite-dc2
+auto.offset.reset=earliest
+enable.auto.commit=false
\ No newline at end of file
diff --git
a/modules/cdc-ext/examples/config/cdc-start-up/cluster-2/kafka.properties
b/modules/cdc-ext/examples/config/cdc-start-up/cluster-2/kafka.properties
new file mode 100644
index 00000000..11af0ce8
--- /dev/null
+++ b/modules/cdc-ext/examples/config/cdc-start-up/cluster-2/kafka.properties
@@ -0,0 +1,2 @@
+bootstrap.servers=127.0.0.1:9092
+request.timeout.ms=10000
\ No newline at end of file
diff --git
a/modules/cdc-ext/examples/config/cdc-start-up/cluster-3/ignite-cdc.properties
b/modules/cdc-ext/examples/config/cdc-start-up/cluster-3/ignite-cdc.properties
new file mode 100644
index 00000000..b18087cc
--- /dev/null
+++
b/modules/cdc-ext/examples/config/cdc-start-up/cluster-3/ignite-cdc.properties
@@ -0,0 +1,28 @@
+# Host server configuration
+server_instance_name=server-3
+server_host=127.0.0.1
+server_local_port=47700
+server_port_range=47700..47710
+server_client_connector_port=10900
+cluster_id=3
+
+# Receiver server configuration
+destination_host=127.0.0.1
+destination_local_port=47500
+destination_port_range=47500..47510
+destination_client_connector_port=10800
+
+# General CDC consumer configuration
+cdc_client_instance_name=cdc-streamer-from-3-to-1
+
+# Kafka CDC consumer configuration
+send_data_kafka_topic_name=dc2_to_dc1
+send_metadata_kafka_topic_name=metadata_from_dc2
+send_kafka_partitions=16
+
+# Kafka CDC receiver configuration
+receive_data_kafka_topic_name=dc1_to_dc2
+receive_metadata_kafka_topic_name=metadata_from_dc1
+receive_kafka_partitions_from=0
+receive_kafka_partitions_to=16
+receive_data_process_thread_number=4
diff --git
a/modules/cdc-ext/examples/config/cdc-start-up/cluster-3/kafka-to-ignite.properties
b/modules/cdc-ext/examples/config/cdc-start-up/cluster-3/kafka-to-ignite.properties
new file mode 100644
index 00000000..20ffb034
--- /dev/null
+++
b/modules/cdc-ext/examples/config/cdc-start-up/cluster-3/kafka-to-ignite.properties
@@ -0,0 +1,5 @@
+bootstrap.servers=127.0.0.1:9092
+request.timeout.ms=10000
+group.id=kafka-to-ignite-dc2
+auto.offset.reset=earliest
+enable.auto.commit=false
\ No newline at end of file
diff --git
a/modules/cdc-ext/examples/config/cdc-start-up/cluster-3/kafka.properties
b/modules/cdc-ext/examples/config/cdc-start-up/cluster-3/kafka.properties
new file mode 100644
index 00000000..11af0ce8
--- /dev/null
+++ b/modules/cdc-ext/examples/config/cdc-start-up/cluster-3/kafka.properties
@@ -0,0 +1,2 @@
+bootstrap.servers=127.0.0.1:9092
+request.timeout.ms=10000
\ No newline at end of file