This is an automated email from the ASF dual-hosted git repository.
fjtiradosarti pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/incubator-kie-kogito-runtimes.git
The following commit(s) were added to refs/heads/main by this push:
new 8424c0067a Fix #3363 (#3368)
8424c0067a is described below
commit 8424c0067a43ce3f5ffe865abd7ff2b9b9a5b2a8
Author: Francisco Javier Tirado Sarti
<[email protected]>
AuthorDate: Wed Jan 24 19:12:09 2024 +0100
Fix #3363 (#3368)
* [Fix_#3363] Allowing multiple topic for kafka listener
* [Fix_#3663] Manual ack and not thread blocking approach
---
.../spring/SpringKafkaCloudEventReceiver.java | 47 +++++++--------
.../spring/SpringKafkaConsumerConfig.java | 9 ++-
.../cloudevents/spring/SpringTopicDiscovery.java | 68 ++++++++++++++++------
3 files changed, 79 insertions(+), 45 deletions(-)
diff --git
a/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaCloudEventReceiver.java
b/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaCloudEventReceiver.java
index a009f26c5a..aa6f1edf21 100644
---
a/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaCloudEventReceiver.java
+++
b/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaCloudEventReceiver.java
@@ -19,19 +19,18 @@
package org.kie.kogito.addon.cloudevents.spring;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
import java.util.function.Function;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.kie.kogito.config.ConfigBean;
import org.kie.kogito.event.CloudEventUnmarshallerFactory;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventReceiver;
import org.kie.kogito.event.EventUnmarshaller;
-import org.kie.kogito.event.KogitoEventStreams;
import org.kie.kogito.event.Subscription;
import org.kie.kogito.event.impl.CloudEventConverter;
import org.kie.kogito.event.impl.DataEventConverter;
@@ -39,7 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import jakarta.annotation.PostConstruct;
@@ -67,34 +66,32 @@ public class SpringKafkaCloudEventReceiver implements
EventReceiver {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public <T> void subscribe(Function<DataEvent<T>, CompletionStage<?>>
consumer, Class<T> clazz) {
-
consumers.add(
new Subscription(consumer, configBean.useCloudEvents() ? new
CloudEventConverter<>(clazz, cloudEventUnmarshaller)
: new DataEventConverter<>(clazz,
eventDataUnmarshaller)));
}
- @KafkaListener(topics = "${kogito.addon.cloudevents.kafka." +
KogitoEventStreams.INCOMING + ":" + KogitoEventStreams.INCOMING + "}")
- public void receive(@Payload Collection<String> messages) throws
InterruptedException {
- log.debug("Received {} events", messages.size());
- Collection<CompletionStage<?>> futures = new ArrayList<>();
- for (String message : messages) {
- for (Subscription<Object, String> consumer : consumers) {
- try {
-
futures.add(consumer.getConsumer().apply(consumer.getConverter().convert(message)));
- } catch (IOException e) {
- log.info("Cannot convert event to the proper type {}",
e.getMessage());
- }
- }
- }
- // wait for this batch to complete
- log.debug("Waiting for all operations in batch to complete");
- for (CompletionStage<?> future : futures) {
+ @KafkaListener(topics = { "#{springTopics.getIncomingTopics}" })
+ public void receive(ConsumerRecord<String, String> message, Acknowledgment
ack) throws InterruptedException {
+ log.debug("Receive message with key {} for topic {}", message.key(),
message.topic());
+ CompletionStage<?> future = CompletableFuture.completedFuture(null);
+ for (Subscription<Object, String> subscription : consumers) {
try {
- future.toCompletableFuture().get();
- } catch (ExecutionException ex) {
- log.error("Error executing consumer", ex.getCause());
+ Object object =
subscription.getConverter().convert(message.value());
+ future = future.thenCompose(f ->
subscription.getConsumer().apply(object));
+ } catch (IOException e) {
+ log.debug("Error converting event. Exception message is {}",
e.getMessage());
}
}
- log.debug("All operations in batch completed");
+ future.whenComplete((v, e) -> acknowledge(e, ack));
+ }
+
+ private void acknowledge(Throwable ex, Acknowledgment ack) {
+ if (ex != null) {
+ log.error("Event publishing failed", ex);
+ } else {
+ log.debug("Acknoledge message");
+ ack.acknowledge();
+ }
}
}
diff --git
a/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaConsumerConfig.java
b/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaConsumerConfig.java
index 1085bb3312..fee6d22955 100644
---
a/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaConsumerConfig.java
+++
b/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringKafkaConsumerConfig.java
@@ -18,6 +18,8 @@
*/
package org.kie.kogito.addon.cloudevents.spring;
+import java.util.Map;
+
import org.springframework.beans.factory.ObjectProvider;
import
org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
@@ -27,6 +29,7 @@ import org.springframework.kafka.annotation.EnableKafka;
import
org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ContainerProperties;
@EnableKafka
@Configuration
@@ -34,8 +37,9 @@ public class SpringKafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory(KafkaProperties
properties, ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
+ Map<String, Object> props = properties.buildConsumerProperties();
DefaultKafkaConsumerFactory<String, String> factory = new
DefaultKafkaConsumerFactory<>(
- properties.buildConsumerProperties());
+ props);
customizers.orderedStream().forEach(customizer ->
customizer.customize(factory));
return factory;
}
@@ -44,7 +48,8 @@ public class SpringKafkaConsumerConfig {
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
- factory.setBatchListener(true);
+ factory.setBatchListener(false);
+
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}
diff --git
a/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringTopicDiscovery.java
b/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringTopicDiscovery.java
index b3d462fdf2..bae999886c 100644
---
a/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringTopicDiscovery.java
+++
b/springboot/addons/messaging/implementation/src/main/java/org/kie/kogito/addon/cloudevents/spring/SpringTopicDiscovery.java
@@ -19,40 +19,72 @@
package org.kie.kogito.addon.cloudevents.spring;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.kie.kogito.addon.cloudevents.AbstractTopicDiscovery;
+import org.kie.kogito.event.ChannelType;
+import org.kie.kogito.event.EventKind;
import org.kie.kogito.event.KogitoEventStreams;
import org.kie.kogito.event.Topic;
-import org.springframework.beans.factory.annotation.Value;
+import org.kie.kogito.event.cloudevents.CloudEventMeta;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
-@Component
+@Component("springTopics")
public class SpringTopicDiscovery extends AbstractTopicDiscovery {
- // in the future we should be implementation agnostic
- @Value(value = "${kogito.addon.cloudevents.kafka." +
KogitoEventStreams.INCOMING + "}")
- String incomingStreamTopic;
+ private static final Logger logger =
LoggerFactory.getLogger(SpringTopicDiscovery.class);
+ private static final String KAFKA_PREFIX =
"kogito.addon.cloudevents.kafka.";
+ private static final String INCOMING_PREFIX = KAFKA_PREFIX +
KogitoEventStreams.INCOMING;
+ private static final String OUTGOING_PREFIX = KAFKA_PREFIX +
KogitoEventStreams.OUTGOING;
- @Value(value = "${kogito.addon.cloudevents.kafka." +
KogitoEventStreams.OUTGOING + "}")
- String outgoingStreamTopic;
+ @Autowired
+ private Environment env;
+
+ @Autowired(required = false)
+ private List<CloudEventMeta> cloudEventMetaList = Collections.emptyList();
+
+ public Set<String> getIncomingTopics() {
+ return getTopics(INCOMING_PREFIX, KogitoEventStreams.INCOMING,
EventKind.CONSUMED);
+ }
+
+ public Set<String> getOutgoingTopics() {
+ return getTopics(OUTGOING_PREFIX, KogitoEventStreams.OUTGOING,
EventKind.PRODUCED);
+ }
@Override
protected List<Topic> getTopics() {
- final List<Topic> topics = new ArrayList<>();
-
- if (incomingStreamTopic != null && !incomingStreamTopic.isEmpty()) {
- final Topic incoming = DEFAULT_INCOMING_CHANNEL;
- incoming.setName(incomingStreamTopic);
- topics.add(incoming);
+ List<Topic> topics = new ArrayList<>();
+ for (String topic : getIncomingTopics()) {
+ topics.add(new Topic(topic, ChannelType.INCOMING));
}
-
- if (outgoingStreamTopic != null && !outgoingStreamTopic.isEmpty()) {
- final Topic outgoing = DEFAULT_OUTGOING_CHANNEL;
- outgoing.setName(outgoingStreamTopic);
- topics.add(outgoing);
+ for (String topic : getOutgoingTopics()) {
+ topics.add(new Topic(topic, ChannelType.OUTGOING));
}
+ logger.debug("Using this list of topics {}", topics);
+ return topics;
+ }
+ private Set<String> getTopics(String prefix, String defaultChannel,
EventKind eventKind) {
+ final String defaultChannelName = env.getProperty(prefix,
defaultChannel);
+ Set<String> topics = cloudEventMetaList.stream().filter(c ->
c.getKind().equals(eventKind)).map(CloudEventMeta::getType).map(this::parserTopicType)
+ .map(t -> env.getProperty(prefix + "." + t,
defaultChannelName))
+ .collect(Collectors.toSet());
+ if (topics.isEmpty()) {
+ logger.debug("Using default channel name {}", defaultChannelName);
+ topics.add(defaultChannelName);
+ }
return topics;
}
+
+ private String parserTopicType(String topic) {
+ int indexOf = topic.lastIndexOf('.');
+ return indexOf == -1 ? topic : topic.substring(indexOf + 1);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]