This is an automated email from the ASF dual-hosted git repository.

fjtiradosarti pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git


The following commit(s) were added to refs/heads/main by this push:
     new 8424c0067a Fix #3363 (#3368)
8424c0067a is described below

commit 8424c0067a43ce3f5ffe865abd7ff2b9b9a5b2a8
Author: Francisco Javier Tirado Sarti 
<[email protected]>
AuthorDate: Wed Jan 24 19:12:09 2024 +0100

    Fix #3363 (#3368)
    
    * [Fix_#3363] Allowing multiple topic for kafka listener
    
    * [Fix_#3663] Manual ack and not thread blocking approach
---
 .../spring/SpringKafkaCloudEventReceiver.java      | 47 +++++++--------
 .../spring/SpringKafkaConsumerConfig.java          |  9 ++-
 .../cloudevents/spring/SpringTopicDiscovery.java   | 68 ++++++++++++++++------
 3 files changed, 79 insertions(+), 45 deletions(-)

diff --git 
a/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaCloudEventReceiver.java
 
b/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaCloudEventReceiver.java
index a009f26c5a..aa6f1edf21 100644
--- 
a/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaCloudEventReceiver.java
+++ 
b/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaCloudEventReceiver.java
@@ -19,19 +19,18 @@
 package org.kie.kogito.addon.cloudevents.spring;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.kie.kogito.config.ConfigBean;
 import org.kie.kogito.event.CloudEventUnmarshallerFactory;
 import org.kie.kogito.event.DataEvent;
 import org.kie.kogito.event.EventReceiver;
 import org.kie.kogito.event.EventUnmarshaller;
-import org.kie.kogito.event.KogitoEventStreams;
 import org.kie.kogito.event.Subscription;
 import org.kie.kogito.event.impl.CloudEventConverter;
 import org.kie.kogito.event.impl.DataEventConverter;
@@ -39,7 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.kafka.support.Acknowledgment;
 import org.springframework.stereotype.Component;
 
 import jakarta.annotation.PostConstruct;
@@ -67,34 +66,32 @@ public class SpringKafkaCloudEventReceiver implements 
EventReceiver {
     @SuppressWarnings({ "rawtypes", "unchecked" })
     @Override
     public <T> void subscribe(Function<DataEvent<T>, CompletionStage<?>> 
consumer, Class<T> clazz) {
-
         consumers.add(
                 new Subscription(consumer, configBean.useCloudEvents() ? new 
CloudEventConverter<>(clazz, cloudEventUnmarshaller)
                         : new DataEventConverter<>(clazz, 
eventDataUnmarshaller)));
     }
 
-    @KafkaListener(topics = "${kogito.addon.cloudevents.kafka." + 
KogitoEventStreams.INCOMING + ":" + KogitoEventStreams.INCOMING + "}")
-    public void receive(@Payload Collection<String> messages) throws 
InterruptedException {
-        log.debug("Received {} events", messages.size());
-        Collection<CompletionStage<?>> futures = new ArrayList<>();
-        for (String message : messages) {
-            for (Subscription<Object, String> consumer : consumers) {
-                try {
-                    
futures.add(consumer.getConsumer().apply(consumer.getConverter().convert(message)));
-                } catch (IOException e) {
-                    log.info("Cannot convert event to the proper type {}", 
e.getMessage());
-                }
-            }
-        }
-        // wait for this batch to complete
-        log.debug("Waiting for all operations in batch to complete");
-        for (CompletionStage<?> future : futures) {
+    @KafkaListener(topics = { "#{springTopics.getIncomingTopics}" })
+    public void receive(ConsumerRecord<String, String> message, Acknowledgment 
ack) throws InterruptedException {
+        log.debug("Receive message with key {} for topic {}", message.key(), 
message.topic());
+        CompletionStage<?> future = CompletableFuture.completedFuture(null);
+        for (Subscription<Object, String> subscription : consumers) {
             try {
-                future.toCompletableFuture().get();
-            } catch (ExecutionException ex) {
-                log.error("Error executing consumer", ex.getCause());
+                Object object = 
subscription.getConverter().convert(message.value());
+                future = future.thenCompose(f -> 
subscription.getConsumer().apply(object));
+            } catch (IOException e) {
+                log.debug("Error converting event. Exception message is {}", 
e.getMessage());
             }
         }
-        log.debug("All operations in batch completed");
+        future.whenComplete((v, e) -> acknowledge(e, ack));
+    }
+
+    private void acknowledge(Throwable ex, Acknowledgment ack) {
+        if (ex != null) {
+            log.error("Event publishing failed", ex);
+        } else {
+            log.debug("Acknoledge message");
+            ack.acknowledge();
+        }
     }
 }
diff --git 
a/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaConsumerConfig.java
 
b/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaConsumerConfig.java
index 1085bb3312..fee6d22955 100644
--- 
a/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaConsumerConfig.java
+++ 
b/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaConsumerConfig.java
@@ -18,6 +18,8 @@
  */
 package org.kie.kogito.addon.cloudevents.spring;
 
+import java.util.Map;
+
 import org.springframework.beans.factory.ObjectProvider;
 import 
org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
@@ -27,6 +29,7 @@ import org.springframework.kafka.annotation.EnableKafka;
 import 
org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
 import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ContainerProperties;
 
 @EnableKafka
 @Configuration
@@ -34,8 +37,9 @@ public class SpringKafkaConsumerConfig {
 
     @Bean
     public ConsumerFactory<String, String> consumerFactory(KafkaProperties 
properties, ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
+        Map<String, Object> props = properties.buildConsumerProperties();
         DefaultKafkaConsumerFactory<String, String> factory = new 
DefaultKafkaConsumerFactory<>(
-                properties.buildConsumerProperties());
+                props);
         customizers.orderedStream().forEach(customizer -> 
customizer.customize(factory));
         return factory;
     }
@@ -44,7 +48,8 @@ public class SpringKafkaConsumerConfig {
     public ConcurrentKafkaListenerContainerFactory<String, String> 
kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
         ConcurrentKafkaListenerContainerFactory<String, String> factory = new 
ConcurrentKafkaListenerContainerFactory<>();
         factory.setConsumerFactory(consumerFactory);
-        factory.setBatchListener(true);
+        factory.setBatchListener(false);
+        
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
         return factory;
     }
 }
diff --git 
a/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringTopicDiscovery.java
 
b/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringTopicDiscovery.java
index b3d462fdf2..bae999886c 100644
--- 
a/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringTopicDiscovery.java
+++ 
b/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringTopicDiscovery.java
@@ -19,40 +19,72 @@
 package org.kie.kogito.addon.cloudevents.spring;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.kie.kogito.addon.cloudevents.AbstractTopicDiscovery;
+import org.kie.kogito.event.ChannelType;
+import org.kie.kogito.event.EventKind;
 import org.kie.kogito.event.KogitoEventStreams;
 import org.kie.kogito.event.Topic;
-import org.springframework.beans.factory.annotation.Value;
+import org.kie.kogito.event.cloudevents.CloudEventMeta;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
 import org.springframework.stereotype.Component;
 
-@Component
+@Component("springTopics")
 public class SpringTopicDiscovery extends AbstractTopicDiscovery {
 
-    // in the future we should be implementation agnostic
-    @Value(value = "${kogito.addon.cloudevents.kafka." + 
KogitoEventStreams.INCOMING + "}")
-    String incomingStreamTopic;
+    private static final Logger logger = 
LoggerFactory.getLogger(SpringTopicDiscovery.class);
+    private static final String KAFKA_PREFIX = 
"kogito.addon.cloudevents.kafka.";
+    private static final String INCOMING_PREFIX = KAFKA_PREFIX + 
KogitoEventStreams.INCOMING;
+    private static final String OUTGOING_PREFIX = KAFKA_PREFIX + 
KogitoEventStreams.OUTGOING;
 
-    @Value(value = "${kogito.addon.cloudevents.kafka." + 
KogitoEventStreams.OUTGOING + "}")
-    String outgoingStreamTopic;
+    @Autowired
+    private Environment env;
+
+    @Autowired(required = false)
+    private List<CloudEventMeta> cloudEventMetaList = Collections.emptyList();
+
+    public Set<String> getIncomingTopics() {
+        return getTopics(INCOMING_PREFIX, KogitoEventStreams.INCOMING, 
EventKind.CONSUMED);
+    }
+
+    public Set<String> getOutgoingTopics() {
+        return getTopics(OUTGOING_PREFIX, KogitoEventStreams.OUTGOING, 
EventKind.PRODUCED);
+    }
 
     @Override
     protected List<Topic> getTopics() {
-        final List<Topic> topics = new ArrayList<>();
-
-        if (incomingStreamTopic != null && !incomingStreamTopic.isEmpty()) {
-            final Topic incoming = DEFAULT_INCOMING_CHANNEL;
-            incoming.setName(incomingStreamTopic);
-            topics.add(incoming);
+        List<Topic> topics = new ArrayList<>();
+        for (String topic : getIncomingTopics()) {
+            topics.add(new Topic(topic, ChannelType.INCOMING));
         }
-
-        if (outgoingStreamTopic != null && !outgoingStreamTopic.isEmpty()) {
-            final Topic outgoing = DEFAULT_OUTGOING_CHANNEL;
-            outgoing.setName(outgoingStreamTopic);
-            topics.add(outgoing);
+        for (String topic : getOutgoingTopics()) {
+            topics.add(new Topic(topic, ChannelType.OUTGOING));
         }
+        logger.debug("Using this list of topics {}", topics);
+        return topics;
+    }
 
+    private Set<String> getTopics(String prefix, String defaultChannel, 
EventKind eventKind) {
+        final String defaultChannelName = env.getProperty(prefix, 
defaultChannel);
+        Set<String> topics = cloudEventMetaList.stream().filter(c -> 
c.getKind().equals(eventKind)).map(CloudEventMeta::getType).map(this::parserTopicType)
+                .map(t -> env.getProperty(prefix + "." + t, 
defaultChannelName))
+                .collect(Collectors.toSet());
+        if (topics.isEmpty()) {
+            logger.debug("Using default channel name {}", defaultChannelName);
+            topics.add(defaultChannelName);
+        }
         return topics;
     }
+
+    private String parserTopicType(String topic) {
+        int indexOf = topic.lastIndexOf('.');
+        return indexOf == -1 ? topic : topic.substring(indexOf + 1);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to