This is an automated email from the ASF dual-hosted git repository.
fjtiradosarti pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git
The following commit(s) were added to refs/heads/main by this push:
new c82ab79a56 Add correlation persistence for MongoDB (#3612)
c82ab79a56 is described below
commit c82ab79a5693634b6e549fd4420dffba43735b49
Author: Matheus Cruz <[email protected]>
AuthorDate: Fri Aug 16 15:51:36 2024 -0300
Add correlation persistence for MongoDB (#3612)
* Add MongoDB support for correlation
* Apply pull request suggestions
---
.../correlation/MongoDBCorrelationRepository.java | 109 +++++++++++++++++++
.../correlation/MongoDBCorrelationService.java | 61 +++++++++++
.../correlation/MongoDBCorrelationServiceIT.java | 119 +++++++++++++++++++++
.../quarkus/MongoDBCorrelationServiceProducer.java | 37 +++++++
4 files changed, 326 insertions(+)
diff --git
a/addons/common/persistence/mongodb/src/main/java/org/kie/kogito/mongodb/correlation/MongoDBCorrelationRepository.java
b/addons/common/persistence/mongodb/src/main/java/org/kie/kogito/mongodb/correlation/MongoDBCorrelationRepository.java
new file mode 100644
index 0000000000..0aa053a4cc
--- /dev/null
+++
b/addons/common/persistence/mongodb/src/main/java/org/kie/kogito/mongodb/correlation/MongoDBCorrelationRepository.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.mongodb.correlation;
+
+import java.io.UncheckedIOException;
+import java.util.Map;
+
+import org.bson.Document;
+import org.bson.codecs.configuration.CodecRegistries;
+import org.bson.codecs.configuration.CodecRegistry;
+import org.bson.conversions.Bson;
+import org.kie.kogito.correlation.CompositeCorrelation;
+import org.kie.kogito.correlation.Correlation;
+import org.kie.kogito.correlation.CorrelationInstance;
+import org.kie.kogito.correlation.SimpleCorrelation;
+import org.kie.kogito.jackson.utils.ObjectMapperFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.mongodb.MongoClientSettings;
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.result.InsertOneResult;
+
+public class MongoDBCorrelationRepository {
+
+ private final MongoCollection<Document> collection;
+ private final ObjectMapper objectMapper;
+
+ private static final String ENCODED_CORRELATION_ID_FIELD =
"encodedCorrelationId";
+ private static final String CORRELATED_ID_FIELD = "correlatedId";
+ private static final String CORRELATION_FIELD = "correlation";
+ private static final String CORRELATION_COLLECTION_NAME = "correlations";
+
+ public MongoDBCorrelationRepository(MongoClient mongoClient, String
dbName) {
+ CodecRegistry registry =
CodecRegistries.fromRegistries(MongoClientSettings.getDefaultCodecRegistry());
+ this.collection =
mongoClient.getDatabase(dbName).getCollection(CORRELATION_COLLECTION_NAME).withCodecRegistry(registry);
+ SimpleModule simpleModule = new SimpleModule();
+ simpleModule.addAbstractTypeMapping(Correlation.class,
SimpleCorrelation.class);
+ this.objectMapper =
ObjectMapperFactory.get().copy().registerModule(simpleModule);
+ }
+
+ public CorrelationInstance insert(final String encodedCorrelationId, final
String correlatedId, final Correlation correlation) {
+
+ CorrelationInstance correlationInstance = new
CorrelationInstance(encodedCorrelationId, correlatedId, correlation);
+ try {
+ Map<String, Object> object = Map.of(
+ ENCODED_CORRELATION_ID_FIELD, encodedCorrelationId,
+ CORRELATED_ID_FIELD, correlatedId,
+ CORRELATION_FIELD, correlation);
+ String json = this.objectMapper.writeValueAsString(object);
+ InsertOneResult insertOneResult =
this.collection.insertOne(Document.parse(json));
+ return insertOneResult.getInsertedId() != null ?
correlationInstance : null;
+ } catch (JsonProcessingException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ public CorrelationInstance findByEncodedCorrelationId(String encoded) {
+ Bson eq = Filters.eq(ENCODED_CORRELATION_ID_FIELD, encoded);
+ return getCorrelationInstanceByFilter(eq);
+ }
+
+ public CorrelationInstance findByCorrelatedId(String correlatedId) {
+ Bson eq = Filters.eq(CORRELATED_ID_FIELD, correlatedId);
+ return getCorrelationInstanceByFilter(eq);
+ }
+
+ private CorrelationInstance getCorrelationInstanceByFilter(Bson eq) {
+ Document first = this.collection.find(eq).first();
+ if (first == null) {
+ return null;
+ } else {
+ Document document = first.get(CORRELATION_FIELD, Document.class);
+ try {
+ CompositeCorrelation compositeCorrelation =
this.objectMapper.readValue(document.toJson(), CompositeCorrelation.class);
+ return new CorrelationInstance(
+ first.getString(ENCODED_CORRELATION_ID_FIELD),
+ first.getString(CORRELATED_ID_FIELD),
+ compositeCorrelation);
+ } catch (JsonProcessingException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
+
+ public void delete(String encoded) {
+ Bson eq = Filters.eq(ENCODED_CORRELATION_ID_FIELD, encoded);
+ this.collection.deleteOne(eq);
+ }
+}
diff --git
a/addons/common/persistence/mongodb/src/main/java/org/kie/kogito/mongodb/correlation/MongoDBCorrelationService.java
b/addons/common/persistence/mongodb/src/main/java/org/kie/kogito/mongodb/correlation/MongoDBCorrelationService.java
new file mode 100644
index 0000000000..f3fe19ae49
--- /dev/null
+++
b/addons/common/persistence/mongodb/src/main/java/org/kie/kogito/mongodb/correlation/MongoDBCorrelationService.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.mongodb.correlation;
+
+import java.util.Optional;
+
+import org.kie.kogito.correlation.Correlation;
+import org.kie.kogito.correlation.CorrelationEncoder;
+import org.kie.kogito.correlation.CorrelationInstance;
+import org.kie.kogito.correlation.CorrelationService;
+import org.kie.kogito.event.correlation.MD5CorrelationEncoder;
+
+public class MongoDBCorrelationService implements CorrelationService {
+
+ private final MongoDBCorrelationRepository correlationRepository;
+ private final CorrelationEncoder correlationEncoder;
+
+ public MongoDBCorrelationService(MongoDBCorrelationRepository
correlationRepository) {
+ this.correlationRepository = correlationRepository;
+ this.correlationEncoder = new MD5CorrelationEncoder();
+ }
+
+ @Override
+ public CorrelationInstance create(Correlation correlation, String
correlatedId) {
+ String encodedCorrelationId =
this.correlationEncoder.encode(correlation);
+ return this.correlationRepository.insert(encodedCorrelationId,
correlatedId, correlation);
+ }
+
+ @Override
+ public Optional<CorrelationInstance> find(Correlation correlation) {
+ String encodedCorrelationId = correlationEncoder.encode(correlation);
+ return
Optional.ofNullable(this.correlationRepository.findByEncodedCorrelationId(encodedCorrelationId));
+ }
+
+ @Override
+ public Optional<CorrelationInstance> findByCorrelatedId(String
correlatedId) {
+ return
Optional.ofNullable(this.correlationRepository.findByCorrelatedId(correlatedId));
+ }
+
+ @Override
+ public void delete(Correlation correlation) {
+ String encodedCorrelationId = correlationEncoder.encode(correlation);
+ this.correlationRepository.delete(encodedCorrelationId);
+ }
+}
diff --git
a/addons/common/persistence/mongodb/src/test/java/org/kie/kogito/mongodb/correlation/MongoDBCorrelationServiceIT.java
b/addons/common/persistence/mongodb/src/test/java/org/kie/kogito/mongodb/correlation/MongoDBCorrelationServiceIT.java
new file mode 100644
index 0000000000..f3abeaf61b
--- /dev/null
+++
b/addons/common/persistence/mongodb/src/test/java/org/kie/kogito/mongodb/correlation/MongoDBCorrelationServiceIT.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.mongodb.correlation;
+
+import java.util.Collections;
+import java.util.Optional;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.kie.kogito.correlation.CompositeCorrelation;
+import org.kie.kogito.correlation.CorrelationInstance;
+import org.kie.kogito.correlation.SimpleCorrelation;
+import org.kie.kogito.testcontainers.KogitoMongoDBContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@Testcontainers
+class MongoDBCorrelationServiceIT {
+
+ @Container
+ final static KogitoMongoDBContainer mongoDBContainer = new
KogitoMongoDBContainer();
+ private static MongoDBCorrelationService correlationService;
+ private static MongoClient mongoClient;
+ private static final String DB_NAME = "test";
+ private static final String COLLECTION_NAME = "correlations";
+
+ @BeforeAll
+ static void setUp() {
+ mongoDBContainer.start();
+ mongoClient = MongoClients.create(mongoDBContainer.getReplicaSetUrl());
+ correlationService = new MongoDBCorrelationService(new
MongoDBCorrelationRepository(
+ mongoClient, DB_NAME));
+ }
+
+ @BeforeEach
+ void beforeEach() {
+ mongoClient.getDatabase(DB_NAME).getCollection(COLLECTION_NAME).drop();
+ }
+
+ @Test
+ void shouldSaveCorrelation() {
+ // arrange
+ String correlatedId = "id";
+ CompositeCorrelation correlation = new
CompositeCorrelation(Collections.singleton(new SimpleCorrelation<>("city", "Rio
de Janeiro")));
+
+ // act
+ correlationService.create(correlation, correlatedId);
+
+ // assert
+ Optional<CorrelationInstance> byCorrelatedId =
correlationService.findByCorrelatedId(correlatedId);
+ assertThat(byCorrelatedId).isNotEmpty();
+ }
+
+ @Test
+ void shouldDeleteCorrelation() {
+ // arrange
+ String correlatedId = "id";
+ CompositeCorrelation correlation = new
CompositeCorrelation(Collections.singleton(new SimpleCorrelation<>("city", "São
Paulo")));
+ correlationService.create(correlation, correlatedId);
+
+ // act
+ correlationService.delete(correlation);
+
+ // assert
+
assertThat(correlationService.findByCorrelatedId(correlatedId)).isEmpty();
+ }
+
+ @Test
+ void shouldFindByCorrelatedId() {
+ // arrange
+ String correlatedId = "id";
+ CompositeCorrelation correlation = new
CompositeCorrelation(Collections.singleton(new SimpleCorrelation<>("city",
"Goiânia")));
+ correlationService.create(correlation, correlatedId);
+
+ // act
+ Optional<CorrelationInstance> byCorrelatedId =
correlationService.findByCorrelatedId(correlatedId);
+
+ // assert
+ assertThat(byCorrelatedId).isNotEmpty();
+ }
+
+ @Test
+ void shouldFindByCorrelation() {
+ // arrange
+ CompositeCorrelation correlation = new
CompositeCorrelation(Collections.singleton(new SimpleCorrelation<>("city",
"Osasco")));
+ String correlatedId = "id";
+
+ correlationService.create(correlation, correlatedId);
+
+ // act
+ Optional<CorrelationInstance> correlationInstance =
correlationService.find(correlation);
+
+ // assert
+ assertThat(correlationInstance).isNotEmpty();
+ }
+
+}
diff --git
a/quarkus/addons/persistence/mongodb/runtime/src/main/java/org/kie/kogito/persistence/quarkus/MongoDBCorrelationServiceProducer.java
b/quarkus/addons/persistence/mongodb/runtime/src/main/java/org/kie/kogito/persistence/quarkus/MongoDBCorrelationServiceProducer.java
new file mode 100644
index 0000000000..4ec1308607
--- /dev/null
+++
b/quarkus/addons/persistence/mongodb/runtime/src/main/java/org/kie/kogito/persistence/quarkus/MongoDBCorrelationServiceProducer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.kie.kogito.persistence.quarkus;
+
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.kie.kogito.mongodb.correlation.MongoDBCorrelationRepository;
+import org.kie.kogito.mongodb.correlation.MongoDBCorrelationService;
+
+import com.mongodb.client.MongoClient;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Produces;
+
+@ApplicationScoped
+public class MongoDBCorrelationServiceProducer {
+
+ @Produces
+ public MongoDBCorrelationService getMongoDBCorrelationService(MongoClient
mongoClient, @ConfigProperty(name = "quarkus.mongodb.database", defaultValue =
"kogito") String dbName) {
+ return new MongoDBCorrelationService(new
MongoDBCorrelationRepository(mongoClient, dbName));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]