This is an automated email from the ASF dual-hosted git repository.

ilgrosso pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/syncope.git


The following commit(s) were added to refs/heads/master by this push:
     new 284c17df3d [SYNCOPE-1617] Debezium with LiveSync (#1025)
284c17df3d is described below

commit 284c17df3df1593b2947679d63e84a0f5a4cc138
Author: Francesco Chicchiriccò <[email protected]>
AuthorDate: Wed Mar 12 12:46:15 2025 +0100

    [SYNCOPE-1617] Debezium with LiveSync (#1025)
---
 .github/workflows/fit_Tomcat_Debezium.yml          |  53 +++++
 fit/core-reference/pom.xml                         | 208 ++++++++++++++++++
 .../reference/DebeziumLiveSyncDeltaMapper.java     | 115 ++++++++++
 .../apache/syncope/fit/core/DebeziumITCase.java    | 234 +++++++++++++++++++++
 .../apache/syncope/fit/core/LiveSyncITCase.java    |   2 +-
 .../src/test/resources/debezium_mysql_init.sql     |  20 ++
 fit/wa-reference/pom.xml                           |   2 +-
 pom.xml                                            |   4 +-
 8 files changed, 635 insertions(+), 3 deletions(-)

diff --git a/.github/workflows/fit_Tomcat_Debezium.yml 
b/.github/workflows/fit_Tomcat_Debezium.yml
new file mode 100644
index 0000000000..c8b3e1bd2a
--- /dev/null
+++ b/.github/workflows/fit_Tomcat_Debezium.yml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License
+name: "FIT Tomcat Debezium"
+
+on:
+  push:
+    branches: ['master', 'pr-*']
+  pull_request:
+    # The branches below must be a subset of the branches above
+    branches: [master]
+  schedule:
+    - cron: '0 13 * * 4'
+
+jobs:
+  fit_Tomcat_Debezium:
+    runs-on: ubuntu-latest
+
+    steps:
+    - name: Checkout repository
+      uses: actions/checkout@v4
+    - name: Setup Java JDK
+      uses: actions/setup-java@v4
+      with:
+        distribution: 'temurin'
+        java-version: 21
+    - name: Setup Maven
+      uses: stCarolas/setup-maven@v5
+      with:
+        maven-version: 3.9.6
+    - uses: actions/cache@v4
+      with:
+        path: ~/.m2/repository
+        key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
+        restore-keys: |
+          ${{ runner.os }}-maven-
+    - name: Build
+      run: mvn -U -T 1C -P 'skipTests,all'
+    - name: 'Tomcat / PostgreSQL / Debezium'
+      run: mvn -f fit/core-reference/pom.xml -P debezium-it 
-Dinvoker.streamLogs=true -Dmodernizer.skip=true -Drat.skip=true 
-Dcheckstyle.skip=true -Djacoco.skip=true
diff --git a/fit/core-reference/pom.xml b/fit/core-reference/pom.xml
index d5d8d413f5..56e81b97bb 100644
--- a/fit/core-reference/pom.xml
+++ b/fit/core-reference/pom.xml
@@ -1363,6 +1363,214 @@ under the License.
       </build>
     </profile>
 
+    <profile>
+      <id>debezium-it</id>
+
+      <dependencies>
+        <dependency>
+          <groupId>com.mysql</groupId>
+          <artifactId>mysql-connector-j</artifactId>
+          <scope>test</scope>
+        </dependency>
+      </dependencies>
+
+      <build>
+        <defaultGoal>clean verify</defaultGoal>
+
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-failsafe-plugin</artifactId>
+            <inherited>true</inherited>
+            <configuration>
+              <includes>
+                
<include>**/org/apache/syncope/fit/core/DebeziumITCase.java</include>
+              </includes>
+              <systemPropertyVariables>
+                
<KAFKA_BOOTSTRAP_SERVERS>${docker.container.kafka.ip}:9092</KAFKA_BOOTSTRAP_SERVERS>
+                <MYSQL_IP>${docker.container.mysql.ip}</MYSQL_IP>
+                <CONNECT_IP>${docker.container.connect.ip}</CONNECT_IP>
+              </systemPropertyVariables>
+            </configuration>
+          </plugin>
+
+          <plugin>
+            <groupId>io.fabric8</groupId>
+            <artifactId>docker-maven-plugin</artifactId>
+            <configuration>
+              <images>
+                <image>
+                  <alias>zookeeper</alias>
+                  <name>zookeeper:${zookeeper.version}</name>
+                  <run>
+                    <volumes>
+                      <bind>
+                        <volume>/etc/localtime:/etc/localtime:ro</volume>
+                      </bind>
+                    </volumes>
+                    <tmpfs>
+                      <mount>/datalog:rw</mount>
+                      <mount>/data:rw</mount>
+                      <mount>/logs:rw</mount>
+                    </tmpfs>
+                  </run>
+                </image>                
+                <image>
+                  <alias>kafka</alias>
+                  <name>quay.io/debezium/kafka:${debezium.version}</name>
+                  <run>
+                    <dependsOn>
+                      <container>zookeeper</container>
+                    </dependsOn>
+                    <links>
+                      <link>zookeeper</link>
+                    </links>
+                    <env>
+                      <ZOOKEEPER_CONNECT>zookeeper:2181</ZOOKEEPER_CONNECT>
+                    </env>
+                    <volumes>
+                      <bind>
+                        <volume>/etc/localtime:/etc/localtime:ro</volume>
+                      </bind>
+                    </volumes>
+                    <tmpfs>
+                      <mount>/kafka/config:rw</mount>
+                      <mount>/kafka/data:rw</mount>
+                      <mount>/kafka/logs:rw</mount>
+                    </tmpfs>
+                  </run>
+                </image>              
+                <image>
+                  <alias>mysql</alias>
+                  
<name>quay.io/debezium/example-mysql:${debezium.version}</name>
+                  <run>
+                    <ports> 
+                      <port>3306:3306</port>
+                    </ports>
+                    <env>
+                      <MYSQL_ROOT_PASSWORD>debezium</MYSQL_ROOT_PASSWORD>
+                      <MYSQL_USER>mysqluser</MYSQL_USER>
+                      <MYSQL_PASSWORD>mysqlpw</MYSQL_PASSWORD>
+                    </env>
+                    <volumes>
+                      <bind>
+                        <volume>/etc/localtime:/etc/localtime:ro</volume>
+                        
<volume>${basedir}/src/test/resources/debezium_mysql_init.sql:/docker-entrypoint-initdb.d/mysql_init.sql:ro</volume>
+                      </bind>
+                    </volumes>
+                    <tmpfs>
+                      <mount>/var/lib/mysql:rw</mount>
+                    </tmpfs>
+                    <wait>
+                      <log>MySQL init process done. Ready for start up.</log>
+                      <time>30000</time>
+                    </wait>
+                  </run>
+                </image>
+                <image>
+                  <alias>connect</alias>
+                  <name>quay.io/debezium/connect:${debezium.version}</name>
+                  <run>
+                    <dependsOn>
+                      <container>kafka</container>
+                      <container>mysql</container>
+                    </dependsOn>
+                    <links>
+                      <link>kafka</link>
+                      <link>mysql</link>
+                    </links>
+                    <env>
+                      <BOOTSTRAP_SERVERS>kafka:9092</BOOTSTRAP_SERVERS>
+                      <GROUP_ID>1</GROUP_ID>
+                      
<CONFIG_STORAGE_TOPIC>my_connect_configs</CONFIG_STORAGE_TOPIC>
+                      
<OFFSET_STORAGE_TOPIC>my_connect_offsets</OFFSET_STORAGE_TOPIC>
+                      
<STATUS_STORAGE_TOPIC>my_connect_statuses</STATUS_STORAGE_TOPIC>
+                    </env>
+                    <volumes>
+                      <bind>
+                        <volume>/etc/localtime:/etc/localtime:ro</volume>
+                      </bind>
+                    </volumes>
+                    <tmpfs>
+                      <mount>/kafka/config:rw</mount>
+                      <mount>/kafka/data:rw</mount>
+                      <mount>/kafka/logs:rw</mount>
+                    </tmpfs>
+                  </run>
+                </image>                
+                <image>
+                  <alias>kafka-ui</alias>
+                  <name>provectuslabs/kafka-ui</name>
+                  <run>
+                    <dependsOn>
+                      <container>kafka</container>
+                    </dependsOn>
+                    <links>
+                      <link>kafka</link>
+                    </links>
+                    <env>
+                      <KAFKA_CLUSTERS_0_NAME>local</KAFKA_CLUSTERS_0_NAME>
+                      
<KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS>kafka:9092</KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS>
+                    </env>
+                    <volumes>
+                      <bind>
+                        <volume>/etc/localtime:/etc/localtime:ro</volume>
+                      </bind>
+                    </volumes>
+                  </run>
+                </image>
+                <image>
+                  <alias>debezium-ui</alias>
+                  <name>quay.io/debezium/debezium-ui</name>
+                  <run>
+                    <dependsOn>
+                      <container>kafka</container>
+                      <container>connect</container>
+                    </dependsOn>
+                    <links>
+                      <link>kafka</link>
+                      <link>connect</link>
+                    </links>
+                    <env>
+                      
<KAFKA_CONNECT_URIS>http://connect:8083</KAFKA_CONNECT_URIS>
+                    </env>
+                    <volumes>
+                      <bind>
+                        <volume>/etc/localtime:/etc/localtime:ro</volume>
+                      </bind>
+                    </volumes>
+                  </run>
+                </image>                
+              </images>
+            </configuration>
+            <executions>
+              <execution>
+                <id>start-debezium</id>
+                <phase>pre-integration-test</phase>
+                <goals>
+                  <goal>start</goal>
+                </goals>
+              </execution>
+              <execution>
+                <id>stop-debezium</id>
+                <phase>post-integration-test</phase>
+                <goals>
+                  <goal>stop</goal>
+                  <goal>remove</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+
+          <plugin>
+            <groupId>org.codehaus.cargo</groupId>
+            <artifactId>cargo-maven3-plugin</artifactId>
+            <inherited>true</inherited>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+
     <profile>
       <id>skipTests</id>
 
diff --git 
a/fit/core-reference/src/main/java/org/apache/syncope/fit/core/reference/DebeziumLiveSyncDeltaMapper.java
 
b/fit/core-reference/src/main/java/org/apache/syncope/fit/core/reference/DebeziumLiveSyncDeltaMapper.java
new file mode 100644
index 0000000000..89c697c949
--- /dev/null
+++ 
b/fit/core-reference/src/main/java/org/apache/syncope/fit/core/reference/DebeziumLiveSyncDeltaMapper.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.syncope.fit.core.reference;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.syncope.common.lib.to.OrgUnit;
+import org.apache.syncope.common.lib.to.Provision;
+import org.apache.syncope.core.provisioning.api.LiveSyncDeltaMapper;
+import org.identityconnectors.framework.common.objects.AttributeBuilder;
+import org.identityconnectors.framework.common.objects.ConnectorObjectBuilder;
+import org.identityconnectors.framework.common.objects.LiveSyncDelta;
+import org.identityconnectors.framework.common.objects.ObjectClass;
+import org.identityconnectors.framework.common.objects.SyncDelta;
+import org.identityconnectors.framework.common.objects.SyncDeltaBuilder;
+import org.identityconnectors.framework.common.objects.SyncDeltaType;
+import org.identityconnectors.framework.common.objects.SyncToken;
+import org.identityconnectors.framework.common.objects.Uid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
+
+public class DebeziumLiveSyncDeltaMapper implements LiveSyncDeltaMapper {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DebeziumLiveSyncDeltaMapper.class);
+
+    private static final JsonMapper JSON_MAPPER = 
JsonMapper.builder().findAndAddModules().build();
+
+    @Override
+    public SyncDelta map(final LiveSyncDelta liveSyncDelta, final OrgUnit 
orgUnit) {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public SyncDelta map(final LiveSyncDelta liveSyncDelta, final Provision 
provision) {
+        if 
(!provision.getObjectClass().equals(liveSyncDelta.getObjectClass().getObjectClassValue()))
 {
+            throw new IllegalArgumentException("Expected " + 
provision.getObjectClass()
+                    + ", got " + 
liveSyncDelta.getObjectClass().getObjectClassValue());
+        }
+
+        long timestamp = 
Optional.ofNullable(liveSyncDelta.getObject().getAttributeByName("record.timestamp")).
+                filter(attr -> !CollectionUtils.isEmpty(attr.getValue())).
+                map(attr -> (Long) attr.getValue().getFirst()).
+                orElseThrow(() -> new IllegalArgumentException("No 
record.timestamp attribute found"));
+        String value = 
Optional.ofNullable(liveSyncDelta.getObject().getAttributeByName("record.value")).
+                filter(attr -> !CollectionUtils.isEmpty(attr.getValue())).
+                map(attr -> attr.getValue().getFirst().toString()).
+                orElseThrow(() -> new IllegalArgumentException("No 
record.value attribute found"));
+        LOG.debug("Received: timestamp {} / value {}", timestamp, value);
+
+        JsonNode payload;
+        try {
+            payload = JSON_MAPPER.readTree(value).get("payload");
+        } catch (IOException e) {
+            throw new IllegalStateException("Could to parse the received value 
as JSON", e);
+        }
+
+        SyncDeltaType syncDeltaType = !payload.has("after") || 
payload.get("after").isNull()
+                ? SyncDeltaType.DELETE
+                : SyncDeltaType.CREATE_OR_UPDATE;
+
+        SyncDeltaBuilder syncDeltaBuilder = new SyncDeltaBuilder().
+                setToken(new SyncToken(timestamp)).
+                setDeltaType(syncDeltaType);
+        if (ObjectClass.ACCOUNT.equals(liveSyncDelta.getObjectClass())) {
+            String element = syncDeltaType == SyncDeltaType.DELETE
+                    ? "before" : "after";
+
+            Uid uid = new Uid(payload.get(element).get("id").asText());
+            syncDeltaBuilder.setUid(uid);
+
+            ConnectorObjectBuilder connObjectBuilder = new 
ConnectorObjectBuilder().
+                    setObjectClass(liveSyncDelta.getObjectClass()).
+                    setUid(uid).
+                    setName(uid.getUidValue());
+            if (syncDeltaType != SyncDeltaType.DELETE) {
+                connObjectBuilder.
+                        addAttribute(AttributeBuilder.build(
+                                "email", 
payload.get(element).get("email").asText())).
+                        addAttribute(AttributeBuilder.build(
+                                "givenName", 
payload.get(element).get("first_name").asText())).
+                        addAttribute(AttributeBuilder.build(
+                                "lastName", 
payload.get(element).get("last_name").asText())).
+                        addAttribute(AttributeBuilder.build(
+                                "fullname",
+                                
payload.get(element).get("first_name").asText() + " "
+                                + 
payload.get(element).get("last_name").asText()));
+            }
+
+            syncDeltaBuilder.setObject(connObjectBuilder.build());
+        } else {
+            throw new IllegalArgumentException("Unsupported ObjectClass: " + 
liveSyncDelta.getObjectClass());
+        }
+
+        return syncDeltaBuilder.build();
+    }
+}
diff --git 
a/fit/core-reference/src/test/java/org/apache/syncope/fit/core/DebeziumITCase.java
 
b/fit/core-reference/src/test/java/org/apache/syncope/fit/core/DebeziumITCase.java
new file mode 100644
index 0000000000..d61efc3928
--- /dev/null
+++ 
b/fit/core-reference/src/test/java/org/apache/syncope/fit/core/DebeziumITCase.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.syncope.fit.core;
+
+import static org.assertj.core.api.Assumptions.assumeThatCollection;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import jakarta.ws.rs.core.MediaType;
+import jakarta.ws.rs.core.Response;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.syncope.common.lib.SyncopeClientException;
+import org.apache.syncope.common.lib.SyncopeConstants;
+import org.apache.syncope.common.lib.to.ConnInstanceTO;
+import org.apache.syncope.common.lib.to.ImplementationTO;
+import org.apache.syncope.common.lib.to.LiveSyncTaskTO;
+import org.apache.syncope.common.lib.to.UserTO;
+import org.apache.syncope.common.lib.types.ClientExceptionType;
+import org.apache.syncope.common.lib.types.IdMImplementationType;
+import org.apache.syncope.common.lib.types.ImplementationEngine;
+import org.apache.syncope.common.lib.types.JobAction;
+import org.apache.syncope.common.lib.types.TaskType;
+import org.apache.syncope.common.rest.api.RESTHeaders;
+import org.apache.syncope.common.rest.api.beans.ExecSpecs;
+import org.apache.syncope.common.rest.api.service.TaskService;
+import org.apache.syncope.core.provisioning.java.pushpull.KafkaInboundActions;
+import org.apache.syncope.fit.AbstractITCase;
+import org.apache.syncope.fit.core.reference.DebeziumLiveSyncDeltaMapper;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.DriverManagerDataSource;
+
+class DebeziumITCase extends AbstractITCase {
+
+    @BeforeAll
+    public static void debeziumSetup() {
+        
assumeThatCollection(System.getProperties().keySet()).anyMatch("CONNECT_IP"::equals);
+
+        ImplementationTO liveSDM = null;
+        try {
+            liveSDM = IMPLEMENTATION_SERVICE.read(
+                    IdMImplementationType.LIVE_SYNC_DELTA_MAPPER, 
DebeziumLiveSyncDeltaMapper.class.getSimpleName());
+        } catch (SyncopeClientException e) {
+            if (e.getType().getResponseStatus() == Response.Status.NOT_FOUND) {
+                liveSDM = new ImplementationTO();
+                
liveSDM.setKey(DebeziumLiveSyncDeltaMapper.class.getSimpleName());
+                liveSDM.setEngine(ImplementationEngine.JAVA);
+                liveSDM.setType(IdMImplementationType.LIVE_SYNC_DELTA_MAPPER);
+                liveSDM.setBody(DebeziumLiveSyncDeltaMapper.class.getName());
+                Response response = IMPLEMENTATION_SERVICE.create(liveSDM);
+                liveSDM = IMPLEMENTATION_SERVICE.read(
+                        liveSDM.getType(), 
response.getHeaderString(RESTHeaders.RESOURCE_KEY));
+                assertNotNull(liveSDM);
+            }
+        }
+        assertNotNull(liveSDM);
+
+        ImplementationTO kafkaInboundActions = null;
+        try {
+            kafkaInboundActions = IMPLEMENTATION_SERVICE.read(
+                    IdMImplementationType.INBOUND_ACTIONS, 
KafkaInboundActions.class.getSimpleName());
+        } catch (SyncopeClientException e) {
+            if (e.getType().getResponseStatus() == Response.Status.NOT_FOUND) {
+                kafkaInboundActions = new ImplementationTO();
+                
kafkaInboundActions.setKey(KafkaInboundActions.class.getSimpleName());
+                kafkaInboundActions.setEngine(ImplementationEngine.JAVA);
+                
kafkaInboundActions.setType(IdMImplementationType.INBOUND_ACTIONS);
+                
kafkaInboundActions.setBody(KafkaInboundActions.class.getName());
+                Response response = 
IMPLEMENTATION_SERVICE.create(kafkaInboundActions);
+                kafkaInboundActions = IMPLEMENTATION_SERVICE.read(
+                        kafkaInboundActions.getType(), 
response.getHeaderString(RESTHeaders.RESOURCE_KEY));
+                assertNotNull(kafkaInboundActions);
+            }
+        }
+        assertNotNull(kafkaInboundActions);
+
+        ConnInstanceTO connector = SerializationUtils.clone(
+                CONNECTOR_SERVICE.read("01938bdf-7ac6-7149-a103-3ec9e74cc824", 
null));
+        connector.getConf().stream().filter(p -> 
"bootstrapServers".equals(p.getSchema().getName())).findFirst().
+                ifPresent(p -> {
+                    p.getValues().clear();
+                    
p.getValues().add(System.getProperty("KAFKA_BOOTSTRAP_SERVERS"));
+                });
+        connector.getConf().stream().filter(p -> 
"accountTopic".equals(p.getSchema().getName())).findFirst().
+                ifPresent(p -> {
+                    p.getValues().clear();
+                    p.getValues().add("dbserver1.inventory.customers");
+                });
+        connector.getConf().removeIf(p -> 
"groupTopic".equals(p.getSchema().getName()));
+
+        CONNECTOR_SERVICE.update(connector);
+    }
+
+    @Test
+    void liveSync() {
+        
assumeThatCollection(System.getProperties().keySet()).anyMatch("CONNECT_IP"::equals);
+
+        // 0. datasource for Debezium
+        DataSource dataSource = new DriverManagerDataSource("jdbc:mysql://" + 
System.getProperty("MYSQL_IP") + ":3306/"
+                + 
"inventory?useSSL=false&allowPublicKeyRetrieval=true&characterEncoding=UTF-8", 
"syncope", "syncope");
+        JdbcTemplate debezium = new JdbcTemplate(dataSource);
+
+        // 1. create and execute the live sync task
+        LiveSyncTaskTO task = new LiveSyncTaskTO();
+        task.setName("Debezium LiveSync");
+        task.setDestinationRealm(SyncopeConstants.ROOT_REALM);
+        task.setResource(RESOURCE_NAME_KAFKA);
+        
task.setLiveSyncDeltaMapper(DebeziumLiveSyncDeltaMapper.class.getSimpleName());
+        task.getActions().add(KafkaInboundActions.class.getSimpleName());
+        task.setPerformCreate(true);
+        task.setPerformUpdate(true);
+        task.setPerformDelete(true);
+
+        Response response = TASK_SERVICE.create(TaskType.LIVE_SYNC, task);
+        LiveSyncTaskTO actual = getObject(response.getLocation(), 
TaskService.class, LiveSyncTaskTO.class);
+        assertNotNull(actual);
+
+        task = TASK_SERVICE.read(TaskType.LIVE_SYNC, actual.getKey(), true);
+        assertNotNull(task);
+        assertEquals(actual.getKey(), task.getKey());
+        assertNotNull(actual.getJobDelegate());
+        assertEquals(actual.getLiveSyncDeltaMapper(), 
task.getLiveSyncDeltaMapper());
+
+        TASK_SERVICE.execute(new 
ExecSpecs.Builder().key(task.getKey()).build());
+
+        // 2. create the Debezium connector
+        WebClient.create("http://"; + System.getProperty("CONNECT_IP") + 
":8083/connectors").
+                accept(MediaType.APPLICATION_JSON).
+                type(MediaType.APPLICATION_JSON).
+                post("""
+                     {
+                       "name": "inventory-connector",
+                       "config": {
+                         "connector.class": 
"io.debezium.connector.mysql.MySqlConnector",
+                         "tasks.max": "1",
+                         "database.hostname": "mysql",
+                         "database.port": "3306",
+                         "database.user": "debezium",
+                         "database.password": "dbz",
+                         "database.server.id": "184054",
+                         "topic.prefix": "dbserver1",
+                         "database.include.list": "inventory",
+                         "schema.history.internal.kafka.bootstrap.servers": 
"kafka:9092",
+                         "schema.history.internal.kafka.topic": 
"schema-changes.inventory"
+                       }
+                     }                     
+                     """);
+
+        try {
+            // 3. check that the initial customers were pulled
+            UserTO user = await().atMost(MAX_WAIT_SECONDS, 
TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(
+                    () -> {
+                        try {
+                            return USER_SERVICE.read("1004");
+                        } catch (SyncopeClientException e) {
+                            return null;
+                        }
+                    }, Objects::nonNull);
+            assertEquals("[email protected]", 
user.getPlainAttr("email").orElseThrow().getValues().getFirst());
+            assertEquals("[email protected]", 
user.getPlainAttr("userId").orElseThrow().getValues().getFirst());
+            assertEquals("Anne", 
user.getPlainAttr("firstname").orElseThrow().getValues().getFirst());
+            assertEquals("Kretchmar", 
user.getPlainAttr("surname").orElseThrow().getValues().getFirst());
+            assertEquals("Anne Kretchmar", 
user.getPlainAttr("fullname").orElseThrow().getValues().getFirst());
+
+            // 4. create new customer
+            debezium.update("INSERT INTO customers VALUES (1005,?,?,?)", 
"John", "Doe", "[email protected]");
+
+            user = await().atMost(MAX_WAIT_SECONDS, 
TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(
+                    () -> {
+                        try {
+                            return USER_SERVICE.read("1005");
+                        } catch (SyncopeClientException e) {
+                            return null;
+                        }
+                    }, Objects::nonNull);
+            assertEquals("[email protected]", 
user.getPlainAttr("email").orElseThrow().getValues().getFirst());
+            assertEquals("[email protected]", 
user.getPlainAttr("userId").orElseThrow().getValues().getFirst());
+            assertEquals("John", 
user.getPlainAttr("firstname").orElseThrow().getValues().getFirst());
+            assertEquals("Doe", 
user.getPlainAttr("surname").orElseThrow().getValues().getFirst());
+            assertEquals("John Doe", 
user.getPlainAttr("fullname").orElseThrow().getValues().getFirst());
+
+            // 5. update existing customer
+            debezium.update("UPDATE customers SET email=? WHERE id=?", 
"[email protected]", 1004);
+
+            await().atMost(MAX_WAIT_SECONDS, TimeUnit.SECONDS).
+                    pollInterval(1, TimeUnit.SECONDS).until(() -> 
"[email protected]".
+                    
equals(USER_SERVICE.read("1004").getPlainAttr("email").orElseThrow().getValues().getFirst()));
+            user = USER_SERVICE.read("1004");
+            assertEquals("[email protected]", 
user.getPlainAttr("userId").orElseThrow().getValues().getFirst());
+            assertEquals("Anne", 
user.getPlainAttr("firstname").orElseThrow().getValues().getFirst());
+            assertEquals("Kretchmar", 
user.getPlainAttr("surname").orElseThrow().getValues().getFirst());
+            assertEquals("Anne Kretchmar", 
user.getPlainAttr("fullname").orElseThrow().getValues().getFirst());
+
+            // 6. delete customer
+            debezium.update("DELETE FROM customers WHERE id=?", 1005);
+
+            SyncopeClientException sce = await().atMost(MAX_WAIT_SECONDS, 
TimeUnit.SECONDS).
+                    pollInterval(1, TimeUnit.SECONDS).until(() -> {
+                try {
+                    USER_SERVICE.read("1005");
+                    return null;
+                } catch (SyncopeClientException e) {
+                    return e;
+                }
+            }, Objects::nonNull);
+            assertEquals(ClientExceptionType.NotFound, sce.getType());
+        } finally {
+            // finally stop live syncing
+            TASK_SERVICE.actionJob(task.getKey(), JobAction.STOP);
+        }
+    }
+}
diff --git 
a/fit/core-reference/src/test/java/org/apache/syncope/fit/core/LiveSyncITCase.java
 
b/fit/core-reference/src/test/java/org/apache/syncope/fit/core/LiveSyncITCase.java
index 158670a2ea..455c0fa40e 100644
--- 
a/fit/core-reference/src/test/java/org/apache/syncope/fit/core/LiveSyncITCase.java
+++ 
b/fit/core-reference/src/test/java/org/apache/syncope/fit/core/LiveSyncITCase.java
@@ -242,9 +242,9 @@ public class LiveSyncITCase extends AbstractITCase {
                     }, Objects::nonNull);
             assertEquals(email, 
user.getPlainAttr("email").orElseThrow().getValues().getFirst());
             assertEquals(email, 
user.getPlainAttr("userId").orElseThrow().getValues().getFirst());
-            assertEquals(email, 
user.getPlainAttr("userId").orElseThrow().getValues().getFirst());
             assertEquals("LiveSync", 
user.getPlainAttr("firstname").orElseThrow().getValues().getFirst());
             assertEquals("LiveSync", 
user.getPlainAttr("surname").orElseThrow().getValues().getFirst());
+            assertEquals("LiveSync LiveSync", 
user.getPlainAttr("fullname").orElseThrow().getValues().getFirst());
 
             await().atMost(MAX_WAIT_SECONDS, TimeUnit.SECONDS).pollInterval(1, 
TimeUnit.SECONDS).until(
                     () -> TASK_SERVICE.read(TaskType.LIVE_SYNC, 
actual.getKey(), true).getExecutions(),
diff --git a/fit/core-reference/src/test/resources/debezium_mysql_init.sql 
b/fit/core-reference/src/test/resources/debezium_mysql_init.sql
new file mode 100644
index 0000000000..53682416ea
--- /dev/null
+++ b/fit/core-reference/src/test/resources/debezium_mysql_init.sql
@@ -0,0 +1,20 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+
+--  http://www.apache.org/licenses/LICENSE-2.0
+
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+CREATE USER 'syncope'@'%' IDENTIFIED WITH caching_sha2_password BY 'syncope';
+GRANT ALL PRIVILEGES ON *.* TO 'syncope'@'%';
+FLUSH PRIVILEGES;
diff --git a/fit/wa-reference/pom.xml b/fit/wa-reference/pom.xml
index 6173c96502..32af56830a 100644
--- a/fit/wa-reference/pom.xml
+++ b/fit/wa-reference/pom.xml
@@ -184,7 +184,7 @@ under the License.
             </image>
             <image>
               <alias>openfga</alias>
-              <name>openfga/openfga:${docker.openfga.version}</name>
+              <name>openfga/openfga:${openfga.version}</name>
               <run>
                 <cmd>run</cmd>
               </run>
diff --git a/pom.xml b/pom.xml
index 23af9d0656..c350252473 100644
--- a/pom.xml
+++ b/pom.xml
@@ -432,7 +432,9 @@ under the License.
     <opensearch.version>2.19.1</opensearch.version>
     <opensearch-java.version>2.22.0</opensearch-java.version>
 
-    <docker.openfga.version>v1</docker.openfga.version>
+    <openfga.version>v1</openfga.version>
+
+    <debezium.version>3.0</debezium.version>
 
     <commons-lang3.version>3.17.0</commons-lang3.version>
     <commons-jexl.version>3.4.0</commons-jexl.version>

Reply via email to