This is an automated email from the ASF dual-hosted git repository.
ilgrosso pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/syncope.git
The following commit(s) were added to refs/heads/master by this push:
new 284c17df3d [SYNCOPE-1617] Debezium with LiveSync (#1025)
284c17df3d is described below
commit 284c17df3df1593b2947679d63e84a0f5a4cc138
Author: Francesco Chicchiriccò <[email protected]>
AuthorDate: Wed Mar 12 12:46:15 2025 +0100
[SYNCOPE-1617] Debezium with LiveSync (#1025)
---
.github/workflows/fit_Tomcat_Debezium.yml | 53 +++++
fit/core-reference/pom.xml | 208 ++++++++++++++++++
.../reference/DebeziumLiveSyncDeltaMapper.java | 115 ++++++++++
.../apache/syncope/fit/core/DebeziumITCase.java | 234 +++++++++++++++++++++
.../apache/syncope/fit/core/LiveSyncITCase.java | 2 +-
.../src/test/resources/debezium_mysql_init.sql | 20 ++
fit/wa-reference/pom.xml | 2 +-
pom.xml | 4 +-
8 files changed, 635 insertions(+), 3 deletions(-)
diff --git a/.github/workflows/fit_Tomcat_Debezium.yml
b/.github/workflows/fit_Tomcat_Debezium.yml
new file mode 100644
index 0000000000..c8b3e1bd2a
--- /dev/null
+++ b/.github/workflows/fit_Tomcat_Debezium.yml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License
+name: "FIT Tomcat Debezium"
+
+on:
+ push:
+ branches: ['master', 'pr-*']
+ pull_request:
+ # The branches below must be a subset of the branches above
+ branches: [master]
+ schedule:
+ - cron: '0 13 * * 4'
+
+jobs:
+ fit_Tomcat_Debezium:
+ runs-on: ubuntu-latest
+
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v4
+ - name: Setup Java JDK
+ uses: actions/setup-java@v4
+ with:
+ distribution: 'temurin'
+ java-version: 21
+ - name: Setup Maven
+ uses: stCarolas/setup-maven@v5
+ with:
+ maven-version: 3.9.6
+ - uses: actions/cache@v4
+ with:
+ path: ~/.m2/repository
+ key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
+ restore-keys: |
+ ${{ runner.os }}-maven-
+ - name: Build
+ run: mvn -U -T 1C -P 'skipTests,all'
+ - name: 'Tomcat / PostgreSQL / Debezium'
+ run: mvn -f fit/core-reference/pom.xml -P debezium-it
-Dinvoker.streamLogs=true -Dmodernizer.skip=true -Drat.skip=true
-Dcheckstyle.skip=true -Djacoco.skip=true
diff --git a/fit/core-reference/pom.xml b/fit/core-reference/pom.xml
index d5d8d413f5..56e81b97bb 100644
--- a/fit/core-reference/pom.xml
+++ b/fit/core-reference/pom.xml
@@ -1363,6 +1363,214 @@ under the License.
</build>
</profile>
+ <profile>
+ <id>debezium-it</id>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.mysql</groupId>
+ <artifactId>mysql-connector-j</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <defaultGoal>clean verify</defaultGoal>
+
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <inherited>true</inherited>
+ <configuration>
+ <includes>
+
<include>**/org/apache/syncope/fit/core/DebeziumITCase.java</include>
+ </includes>
+ <systemPropertyVariables>
+
<KAFKA_BOOTSTRAP_SERVERS>${docker.container.kafka.ip}:9092</KAFKA_BOOTSTRAP_SERVERS>
+ <MYSQL_IP>${docker.container.mysql.ip}</MYSQL_IP>
+ <CONNECT_IP>${docker.container.connect.ip}</CONNECT_IP>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <configuration>
+ <images>
+ <image>
+ <alias>zookeeper</alias>
+ <name>zookeeper:${zookeeper.version}</name>
+ <run>
+ <volumes>
+ <bind>
+ <volume>/etc/localtime:/etc/localtime:ro</volume>
+ </bind>
+ </volumes>
+ <tmpfs>
+ <mount>/datalog:rw</mount>
+ <mount>/data:rw</mount>
+ <mount>/logs:rw</mount>
+ </tmpfs>
+ </run>
+ </image>
+ <image>
+ <alias>kafka</alias>
+ <name>quay.io/debezium/kafka:${debezium.version}</name>
+ <run>
+ <dependsOn>
+ <container>zookeeper</container>
+ </dependsOn>
+ <links>
+ <link>zookeeper</link>
+ </links>
+ <env>
+ <ZOOKEEPER_CONNECT>zookeeper:2181</ZOOKEEPER_CONNECT>
+ </env>
+ <volumes>
+ <bind>
+ <volume>/etc/localtime:/etc/localtime:ro</volume>
+ </bind>
+ </volumes>
+ <tmpfs>
+ <mount>/kafka/config:rw</mount>
+ <mount>/kafka/data:rw</mount>
+ <mount>/kafka/logs:rw</mount>
+ </tmpfs>
+ </run>
+ </image>
+ <image>
+ <alias>mysql</alias>
+
<name>quay.io/debezium/example-mysql:${debezium.version}</name>
+ <run>
+ <ports>
+ <port>3306:3306</port>
+ </ports>
+ <env>
+ <MYSQL_ROOT_PASSWORD>debezium</MYSQL_ROOT_PASSWORD>
+ <MYSQL_USER>mysqluser</MYSQL_USER>
+ <MYSQL_PASSWORD>mysqlpw</MYSQL_PASSWORD>
+ </env>
+ <volumes>
+ <bind>
+ <volume>/etc/localtime:/etc/localtime:ro</volume>
+
<volume>${basedir}/src/test/resources/debezium_mysql_init.sql:/docker-entrypoint-initdb.d/mysql_init.sql:ro</volume>
+ </bind>
+ </volumes>
+ <tmpfs>
+ <mount>/var/lib/mysql:rw</mount>
+ </tmpfs>
+ <wait>
+ <log>MySQL init process done. Ready for start up.</log>
+ <time>30000</time>
+ </wait>
+ </run>
+ </image>
+ <image>
+ <alias>connect</alias>
+ <name>quay.io/debezium/connect:${debezium.version}</name>
+ <run>
+ <dependsOn>
+ <container>kafka</container>
+ <container>mysql</container>
+ </dependsOn>
+ <links>
+ <link>kafka</link>
+ <link>mysql</link>
+ </links>
+ <env>
+ <BOOTSTRAP_SERVERS>kafka:9092</BOOTSTRAP_SERVERS>
+ <GROUP_ID>1</GROUP_ID>
+
<CONFIG_STORAGE_TOPIC>my_connect_configs</CONFIG_STORAGE_TOPIC>
+
<OFFSET_STORAGE_TOPIC>my_connect_offsets</OFFSET_STORAGE_TOPIC>
+
<STATUS_STORAGE_TOPIC>my_connect_statuses</STATUS_STORAGE_TOPIC>
+ </env>
+ <volumes>
+ <bind>
+ <volume>/etc/localtime:/etc/localtime:ro</volume>
+ </bind>
+ </volumes>
+ <tmpfs>
+ <mount>/kafka/config:rw</mount>
+ <mount>/kafka/data:rw</mount>
+ <mount>/kafka/logs:rw</mount>
+ </tmpfs>
+ </run>
+ </image>
+ <image>
+ <alias>kafka-ui</alias>
+ <name>provectuslabs/kafka-ui</name>
+ <run>
+ <dependsOn>
+ <container>kafka</container>
+ </dependsOn>
+ <links>
+ <link>kafka</link>
+ </links>
+ <env>
+ <KAFKA_CLUSTERS_0_NAME>local</KAFKA_CLUSTERS_0_NAME>
+
<KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS>kafka:9092</KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS>
+ </env>
+ <volumes>
+ <bind>
+ <volume>/etc/localtime:/etc/localtime:ro</volume>
+ </bind>
+ </volumes>
+ </run>
+ </image>
+ <image>
+ <alias>debezium-ui</alias>
+ <name>quay.io/debezium/debezium-ui</name>
+ <run>
+ <dependsOn>
+ <container>kafka</container>
+ <container>connect</container>
+ </dependsOn>
+ <links>
+ <link>kafka</link>
+ <link>connect</link>
+ </links>
+ <env>
+
<KAFKA_CONNECT_URIS>http://connect:8083</KAFKA_CONNECT_URIS>
+ </env>
+ <volumes>
+ <bind>
+ <volume>/etc/localtime:/etc/localtime:ro</volume>
+ </bind>
+ </volumes>
+ </run>
+ </image>
+ </images>
+ </configuration>
+ <executions>
+ <execution>
+ <id>start-debezium</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>start</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>stop-debezium</id>
+ <phase>post-integration-test</phase>
+ <goals>
+ <goal>stop</goal>
+ <goal>remove</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.cargo</groupId>
+ <artifactId>cargo-maven3-plugin</artifactId>
+ <inherited>true</inherited>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
<profile>
<id>skipTests</id>
diff --git
a/fit/core-reference/src/main/java/org/apache/syncope/fit/core/reference/DebeziumLiveSyncDeltaMapper.java
b/fit/core-reference/src/main/java/org/apache/syncope/fit/core/reference/DebeziumLiveSyncDeltaMapper.java
new file mode 100644
index 0000000000..89c697c949
--- /dev/null
+++
b/fit/core-reference/src/main/java/org/apache/syncope/fit/core/reference/DebeziumLiveSyncDeltaMapper.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.syncope.fit.core.reference;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.syncope.common.lib.to.OrgUnit;
+import org.apache.syncope.common.lib.to.Provision;
+import org.apache.syncope.core.provisioning.api.LiveSyncDeltaMapper;
+import org.identityconnectors.framework.common.objects.AttributeBuilder;
+import org.identityconnectors.framework.common.objects.ConnectorObjectBuilder;
+import org.identityconnectors.framework.common.objects.LiveSyncDelta;
+import org.identityconnectors.framework.common.objects.ObjectClass;
+import org.identityconnectors.framework.common.objects.SyncDelta;
+import org.identityconnectors.framework.common.objects.SyncDeltaBuilder;
+import org.identityconnectors.framework.common.objects.SyncDeltaType;
+import org.identityconnectors.framework.common.objects.SyncToken;
+import org.identityconnectors.framework.common.objects.Uid;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.CollectionUtils;
+
+public class DebeziumLiveSyncDeltaMapper implements LiveSyncDeltaMapper {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DebeziumLiveSyncDeltaMapper.class);
+
+ private static final JsonMapper JSON_MAPPER =
JsonMapper.builder().findAndAddModules().build();
+
+ @Override
+ public SyncDelta map(final LiveSyncDelta liveSyncDelta, final OrgUnit
orgUnit) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public SyncDelta map(final LiveSyncDelta liveSyncDelta, final Provision
provision) {
+ if
(!provision.getObjectClass().equals(liveSyncDelta.getObjectClass().getObjectClassValue()))
{
+ throw new IllegalArgumentException("Expected " +
provision.getObjectClass()
+ + ", got " +
liveSyncDelta.getObjectClass().getObjectClassValue());
+ }
+
+ long timestamp =
Optional.ofNullable(liveSyncDelta.getObject().getAttributeByName("record.timestamp")).
+ filter(attr -> !CollectionUtils.isEmpty(attr.getValue())).
+ map(attr -> (Long) attr.getValue().getFirst()).
+ orElseThrow(() -> new IllegalArgumentException("No
record.timestamp attribute found"));
+ String value =
Optional.ofNullable(liveSyncDelta.getObject().getAttributeByName("record.value")).
+ filter(attr -> !CollectionUtils.isEmpty(attr.getValue())).
+ map(attr -> attr.getValue().getFirst().toString()).
+ orElseThrow(() -> new IllegalArgumentException("No
record.value attribute found"));
+ LOG.debug("Received: timestamp {} / value {}", timestamp, value);
+
+ JsonNode payload;
+ try {
+ payload = JSON_MAPPER.readTree(value).get("payload");
+ } catch (IOException e) {
+ throw new IllegalStateException("Could to parse the received value
as JSON", e);
+ }
+
+ SyncDeltaType syncDeltaType = !payload.has("after") ||
payload.get("after").isNull()
+ ? SyncDeltaType.DELETE
+ : SyncDeltaType.CREATE_OR_UPDATE;
+
+ SyncDeltaBuilder syncDeltaBuilder = new SyncDeltaBuilder().
+ setToken(new SyncToken(timestamp)).
+ setDeltaType(syncDeltaType);
+ if (ObjectClass.ACCOUNT.equals(liveSyncDelta.getObjectClass())) {
+ String element = syncDeltaType == SyncDeltaType.DELETE
+ ? "before" : "after";
+
+ Uid uid = new Uid(payload.get(element).get("id").asText());
+ syncDeltaBuilder.setUid(uid);
+
+ ConnectorObjectBuilder connObjectBuilder = new
ConnectorObjectBuilder().
+ setObjectClass(liveSyncDelta.getObjectClass()).
+ setUid(uid).
+ setName(uid.getUidValue());
+ if (syncDeltaType != SyncDeltaType.DELETE) {
+ connObjectBuilder.
+ addAttribute(AttributeBuilder.build(
+ "email",
payload.get(element).get("email").asText())).
+ addAttribute(AttributeBuilder.build(
+ "givenName",
payload.get(element).get("first_name").asText())).
+ addAttribute(AttributeBuilder.build(
+ "lastName",
payload.get(element).get("last_name").asText())).
+ addAttribute(AttributeBuilder.build(
+ "fullname",
+
payload.get(element).get("first_name").asText() + " "
+ +
payload.get(element).get("last_name").asText()));
+ }
+
+ syncDeltaBuilder.setObject(connObjectBuilder.build());
+ } else {
+ throw new IllegalArgumentException("Unsupported ObjectClass: " +
liveSyncDelta.getObjectClass());
+ }
+
+ return syncDeltaBuilder.build();
+ }
+}
diff --git
a/fit/core-reference/src/test/java/org/apache/syncope/fit/core/DebeziumITCase.java
b/fit/core-reference/src/test/java/org/apache/syncope/fit/core/DebeziumITCase.java
new file mode 100644
index 0000000000..d61efc3928
--- /dev/null
+++
b/fit/core-reference/src/test/java/org/apache/syncope/fit/core/DebeziumITCase.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.syncope.fit.core;
+
+import static org.assertj.core.api.Assumptions.assumeThatCollection;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import jakarta.ws.rs.core.MediaType;
+import jakarta.ws.rs.core.Response;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.cxf.jaxrs.client.WebClient;
+import org.apache.syncope.common.lib.SyncopeClientException;
+import org.apache.syncope.common.lib.SyncopeConstants;
+import org.apache.syncope.common.lib.to.ConnInstanceTO;
+import org.apache.syncope.common.lib.to.ImplementationTO;
+import org.apache.syncope.common.lib.to.LiveSyncTaskTO;
+import org.apache.syncope.common.lib.to.UserTO;
+import org.apache.syncope.common.lib.types.ClientExceptionType;
+import org.apache.syncope.common.lib.types.IdMImplementationType;
+import org.apache.syncope.common.lib.types.ImplementationEngine;
+import org.apache.syncope.common.lib.types.JobAction;
+import org.apache.syncope.common.lib.types.TaskType;
+import org.apache.syncope.common.rest.api.RESTHeaders;
+import org.apache.syncope.common.rest.api.beans.ExecSpecs;
+import org.apache.syncope.common.rest.api.service.TaskService;
+import org.apache.syncope.core.provisioning.java.pushpull.KafkaInboundActions;
+import org.apache.syncope.fit.AbstractITCase;
+import org.apache.syncope.fit.core.reference.DebeziumLiveSyncDeltaMapper;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.DriverManagerDataSource;
+
+class DebeziumITCase extends AbstractITCase {
+
+ @BeforeAll
+ public static void debeziumSetup() {
+
assumeThatCollection(System.getProperties().keySet()).anyMatch("CONNECT_IP"::equals);
+
+ ImplementationTO liveSDM = null;
+ try {
+ liveSDM = IMPLEMENTATION_SERVICE.read(
+ IdMImplementationType.LIVE_SYNC_DELTA_MAPPER,
DebeziumLiveSyncDeltaMapper.class.getSimpleName());
+ } catch (SyncopeClientException e) {
+ if (e.getType().getResponseStatus() == Response.Status.NOT_FOUND) {
+ liveSDM = new ImplementationTO();
+
liveSDM.setKey(DebeziumLiveSyncDeltaMapper.class.getSimpleName());
+ liveSDM.setEngine(ImplementationEngine.JAVA);
+ liveSDM.setType(IdMImplementationType.LIVE_SYNC_DELTA_MAPPER);
+ liveSDM.setBody(DebeziumLiveSyncDeltaMapper.class.getName());
+ Response response = IMPLEMENTATION_SERVICE.create(liveSDM);
+ liveSDM = IMPLEMENTATION_SERVICE.read(
+ liveSDM.getType(),
response.getHeaderString(RESTHeaders.RESOURCE_KEY));
+ assertNotNull(liveSDM);
+ }
+ }
+ assertNotNull(liveSDM);
+
+ ImplementationTO kafkaInboundActions = null;
+ try {
+ kafkaInboundActions = IMPLEMENTATION_SERVICE.read(
+ IdMImplementationType.INBOUND_ACTIONS,
KafkaInboundActions.class.getSimpleName());
+ } catch (SyncopeClientException e) {
+ if (e.getType().getResponseStatus() == Response.Status.NOT_FOUND) {
+ kafkaInboundActions = new ImplementationTO();
+
kafkaInboundActions.setKey(KafkaInboundActions.class.getSimpleName());
+ kafkaInboundActions.setEngine(ImplementationEngine.JAVA);
+
kafkaInboundActions.setType(IdMImplementationType.INBOUND_ACTIONS);
+
kafkaInboundActions.setBody(KafkaInboundActions.class.getName());
+ Response response =
IMPLEMENTATION_SERVICE.create(kafkaInboundActions);
+ kafkaInboundActions = IMPLEMENTATION_SERVICE.read(
+ kafkaInboundActions.getType(),
response.getHeaderString(RESTHeaders.RESOURCE_KEY));
+ assertNotNull(kafkaInboundActions);
+ }
+ }
+ assertNotNull(kafkaInboundActions);
+
+ ConnInstanceTO connector = SerializationUtils.clone(
+ CONNECTOR_SERVICE.read("01938bdf-7ac6-7149-a103-3ec9e74cc824",
null));
+ connector.getConf().stream().filter(p ->
"bootstrapServers".equals(p.getSchema().getName())).findFirst().
+ ifPresent(p -> {
+ p.getValues().clear();
+
p.getValues().add(System.getProperty("KAFKA_BOOTSTRAP_SERVERS"));
+ });
+ connector.getConf().stream().filter(p ->
"accountTopic".equals(p.getSchema().getName())).findFirst().
+ ifPresent(p -> {
+ p.getValues().clear();
+ p.getValues().add("dbserver1.inventory.customers");
+ });
+ connector.getConf().removeIf(p ->
"groupTopic".equals(p.getSchema().getName()));
+
+ CONNECTOR_SERVICE.update(connector);
+ }
+
+ @Test
+ void liveSync() {
+
assumeThatCollection(System.getProperties().keySet()).anyMatch("CONNECT_IP"::equals);
+
+ // 0. datasource for Debezium
+ DataSource dataSource = new DriverManagerDataSource("jdbc:mysql://" +
System.getProperty("MYSQL_IP") + ":3306/"
+ +
"inventory?useSSL=false&allowPublicKeyRetrieval=true&characterEncoding=UTF-8",
"syncope", "syncope");
+ JdbcTemplate debezium = new JdbcTemplate(dataSource);
+
+ // 1. create and execute the live sync task
+ LiveSyncTaskTO task = new LiveSyncTaskTO();
+ task.setName("Debezium LiveSync");
+ task.setDestinationRealm(SyncopeConstants.ROOT_REALM);
+ task.setResource(RESOURCE_NAME_KAFKA);
+
task.setLiveSyncDeltaMapper(DebeziumLiveSyncDeltaMapper.class.getSimpleName());
+ task.getActions().add(KafkaInboundActions.class.getSimpleName());
+ task.setPerformCreate(true);
+ task.setPerformUpdate(true);
+ task.setPerformDelete(true);
+
+ Response response = TASK_SERVICE.create(TaskType.LIVE_SYNC, task);
+ LiveSyncTaskTO actual = getObject(response.getLocation(),
TaskService.class, LiveSyncTaskTO.class);
+ assertNotNull(actual);
+
+ task = TASK_SERVICE.read(TaskType.LIVE_SYNC, actual.getKey(), true);
+ assertNotNull(task);
+ assertEquals(actual.getKey(), task.getKey());
+ assertNotNull(actual.getJobDelegate());
+ assertEquals(actual.getLiveSyncDeltaMapper(),
task.getLiveSyncDeltaMapper());
+
+ TASK_SERVICE.execute(new
ExecSpecs.Builder().key(task.getKey()).build());
+
+ // 2. create the Debezium connector
+ WebClient.create("http://" + System.getProperty("CONNECT_IP") +
":8083/connectors").
+ accept(MediaType.APPLICATION_JSON).
+ type(MediaType.APPLICATION_JSON).
+ post("""
+ {
+ "name": "inventory-connector",
+ "config": {
+ "connector.class":
"io.debezium.connector.mysql.MySqlConnector",
+ "tasks.max": "1",
+ "database.hostname": "mysql",
+ "database.port": "3306",
+ "database.user": "debezium",
+ "database.password": "dbz",
+ "database.server.id": "184054",
+ "topic.prefix": "dbserver1",
+ "database.include.list": "inventory",
+ "schema.history.internal.kafka.bootstrap.servers":
"kafka:9092",
+ "schema.history.internal.kafka.topic":
"schema-changes.inventory"
+ }
+ }
+ """);
+
+ try {
+ // 3. check that the initial customers were pulled
+ UserTO user = await().atMost(MAX_WAIT_SECONDS,
TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(
+ () -> {
+ try {
+ return USER_SERVICE.read("1004");
+ } catch (SyncopeClientException e) {
+ return null;
+ }
+ }, Objects::nonNull);
+ assertEquals("[email protected]",
user.getPlainAttr("email").orElseThrow().getValues().getFirst());
+ assertEquals("[email protected]",
user.getPlainAttr("userId").orElseThrow().getValues().getFirst());
+ assertEquals("Anne",
user.getPlainAttr("firstname").orElseThrow().getValues().getFirst());
+ assertEquals("Kretchmar",
user.getPlainAttr("surname").orElseThrow().getValues().getFirst());
+ assertEquals("Anne Kretchmar",
user.getPlainAttr("fullname").orElseThrow().getValues().getFirst());
+
+ // 4. create new customer
+ debezium.update("INSERT INTO customers VALUES (1005,?,?,?)",
"John", "Doe", "[email protected]");
+
+ user = await().atMost(MAX_WAIT_SECONDS,
TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(
+ () -> {
+ try {
+ return USER_SERVICE.read("1005");
+ } catch (SyncopeClientException e) {
+ return null;
+ }
+ }, Objects::nonNull);
+ assertEquals("[email protected]",
user.getPlainAttr("email").orElseThrow().getValues().getFirst());
+ assertEquals("[email protected]",
user.getPlainAttr("userId").orElseThrow().getValues().getFirst());
+ assertEquals("John",
user.getPlainAttr("firstname").orElseThrow().getValues().getFirst());
+ assertEquals("Doe",
user.getPlainAttr("surname").orElseThrow().getValues().getFirst());
+ assertEquals("John Doe",
user.getPlainAttr("fullname").orElseThrow().getValues().getFirst());
+
+ // 5. update existing customer
+ debezium.update("UPDATE customers SET email=? WHERE id=?",
"[email protected]", 1004);
+
+ await().atMost(MAX_WAIT_SECONDS, TimeUnit.SECONDS).
+ pollInterval(1, TimeUnit.SECONDS).until(() ->
"[email protected]".
+
equals(USER_SERVICE.read("1004").getPlainAttr("email").orElseThrow().getValues().getFirst()));
+ user = USER_SERVICE.read("1004");
+ assertEquals("[email protected]",
user.getPlainAttr("userId").orElseThrow().getValues().getFirst());
+ assertEquals("Anne",
user.getPlainAttr("firstname").orElseThrow().getValues().getFirst());
+ assertEquals("Kretchmar",
user.getPlainAttr("surname").orElseThrow().getValues().getFirst());
+ assertEquals("Anne Kretchmar",
user.getPlainAttr("fullname").orElseThrow().getValues().getFirst());
+
+ // 6. delete customer
+ debezium.update("DELETE FROM customers WHERE id=?", 1005);
+
+ SyncopeClientException sce = await().atMost(MAX_WAIT_SECONDS,
TimeUnit.SECONDS).
+ pollInterval(1, TimeUnit.SECONDS).until(() -> {
+ try {
+ USER_SERVICE.read("1005");
+ return null;
+ } catch (SyncopeClientException e) {
+ return e;
+ }
+ }, Objects::nonNull);
+ assertEquals(ClientExceptionType.NotFound, sce.getType());
+ } finally {
+ // finally stop live syncing
+ TASK_SERVICE.actionJob(task.getKey(), JobAction.STOP);
+ }
+ }
+}
diff --git
a/fit/core-reference/src/test/java/org/apache/syncope/fit/core/LiveSyncITCase.java
b/fit/core-reference/src/test/java/org/apache/syncope/fit/core/LiveSyncITCase.java
index 158670a2ea..455c0fa40e 100644
---
a/fit/core-reference/src/test/java/org/apache/syncope/fit/core/LiveSyncITCase.java
+++
b/fit/core-reference/src/test/java/org/apache/syncope/fit/core/LiveSyncITCase.java
@@ -242,9 +242,9 @@ public class LiveSyncITCase extends AbstractITCase {
}, Objects::nonNull);
assertEquals(email,
user.getPlainAttr("email").orElseThrow().getValues().getFirst());
assertEquals(email,
user.getPlainAttr("userId").orElseThrow().getValues().getFirst());
- assertEquals(email,
user.getPlainAttr("userId").orElseThrow().getValues().getFirst());
assertEquals("LiveSync",
user.getPlainAttr("firstname").orElseThrow().getValues().getFirst());
assertEquals("LiveSync",
user.getPlainAttr("surname").orElseThrow().getValues().getFirst());
+ assertEquals("LiveSync LiveSync",
user.getPlainAttr("fullname").orElseThrow().getValues().getFirst());
await().atMost(MAX_WAIT_SECONDS, TimeUnit.SECONDS).pollInterval(1,
TimeUnit.SECONDS).until(
() -> TASK_SERVICE.read(TaskType.LIVE_SYNC,
actual.getKey(), true).getExecutions(),
diff --git a/fit/core-reference/src/test/resources/debezium_mysql_init.sql
b/fit/core-reference/src/test/resources/debezium_mysql_init.sql
new file mode 100644
index 0000000000..53682416ea
--- /dev/null
+++ b/fit/core-reference/src/test/resources/debezium_mysql_init.sql
@@ -0,0 +1,20 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+
+-- http://www.apache.org/licenses/LICENSE-2.0
+
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied. See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+CREATE USER 'syncope'@'%' IDENTIFIED WITH caching_sha2_password BY 'syncope';
+GRANT ALL PRIVILEGES ON *.* TO 'syncope'@'%';
+FLUSH PRIVILEGES;
diff --git a/fit/wa-reference/pom.xml b/fit/wa-reference/pom.xml
index 6173c96502..32af56830a 100644
--- a/fit/wa-reference/pom.xml
+++ b/fit/wa-reference/pom.xml
@@ -184,7 +184,7 @@ under the License.
</image>
<image>
<alias>openfga</alias>
- <name>openfga/openfga:${docker.openfga.version}</name>
+ <name>openfga/openfga:${openfga.version}</name>
<run>
<cmd>run</cmd>
</run>
diff --git a/pom.xml b/pom.xml
index 23af9d0656..c350252473 100644
--- a/pom.xml
+++ b/pom.xml
@@ -432,7 +432,9 @@ under the License.
<opensearch.version>2.19.1</opensearch.version>
<opensearch-java.version>2.22.0</opensearch-java.version>
- <docker.openfga.version>v1</docker.openfga.version>
+ <openfga.version>v1</openfga.version>
+
+ <debezium.version>3.0</debezium.version>
<commons-lang3.version>3.17.0</commons-lang3.version>
<commons-jexl.version>3.4.0</commons-jexl.version>