This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 83d369772fb69125dc058b00dd33ef3938da13bf
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Tue Feb 1 12:40:44 2022 +0100

    CAMEL-15562: added a reusable kafka strategy
    
    This introduces a basic Kafka strategy that can be extended to publish
    offsets to a Kafka topic
---
 .../resume/kafka/AbstractKafkaResumeStrategy.java  | 281 +++++++++++++++++++++
 1 file changed, 281 insertions(+)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
new file mode 100644
index 0000000..c8bd037
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.resume.kafka;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.camel.Resumable;
+import org.apache.camel.UpdatableConsumerResumeStrategy;
+import org.apache.camel.util.StringHelper;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractKafkaResumeStrategy<K, V> implements 
UpdatableConsumerResumeStrategy<K, V, Resumable<K, V>> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractKafkaResumeStrategy.class);
+
+    private final String topic;
+
+    private Consumer<K, V> consumer;
+    private Producer<K, V> producer;
+    private long errorCount;
+
+    private Map<K, List<V>> processedItems = new TreeMap<>();
+    private List<Future<RecordMetadata>> sentItems = new ArrayList<>();
+    private boolean subscribed;
+    private Properties producerConfig;
+    private Properties consumerConfig;
+
+    public AbstractKafkaResumeStrategy(String bootstrapServers, String topic) {
+        this.topic = topic;
+
+        this.producerConfig = createProducer(bootstrapServers);
+        this.consumerConfig = createConsumer(bootstrapServers);
+    }
+
+    public AbstractKafkaResumeStrategy(String bootstrapServers, String topic, 
Properties producerConfig,
+                                       Properties consumerConfig) {
+        this.topic = topic;
+        this.producerConfig = producerConfig;
+        this.consumerConfig = consumerConfig;
+    }
+
+    public void start() throws Exception {
+        consumer = new KafkaConsumer<>(consumerConfig);
+        producer = new KafkaProducer<>(producerConfig);
+
+        loadProcessedItems(processedItems);
+    }
+
+    private Properties createProducer(String bootstrapServers) {
+        Properties config = new Properties();
+
+        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+
+        StringHelper.notEmpty(bootstrapServers, "bootstrapServers");
+        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+
+        // set up the producer to remove all batching on send, we want all 
sends
+        // to be fully synchronous
+        config.putIfAbsent(ProducerConfig.ACKS_CONFIG, "1");
+        config.putIfAbsent(ProducerConfig.BATCH_SIZE_CONFIG, "0");
+
+        return config;
+    }
+
+    private Properties createConsumer(String bootstrapServers) {
+        Properties config = new Properties();
+
+        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+
+        StringHelper.notEmpty(bootstrapServers, "bootstrapServers");
+        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+
+        String groupId = UUID.randomUUID().toString();
+        LOG.debug("Creating consumer with {}[{}]", 
ConsumerConfig.GROUP_ID_CONFIG, groupId);
+
+        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
Boolean.TRUE.toString());
+
+        return config;
+    }
+
+    /**
+     * Sends data to a topic
+     *
+     * @param  topic                the topic to send data to
+     * @param  message              the message to send
+     * @throws ExecutionException
+     * @throws InterruptedException
+     */
+    public void produce(String topic, K key, V message) throws 
ExecutionException, InterruptedException {
+        ProducerRecord<K, V> record = new ProducerRecord<>(topic, key, 
message);
+
+        errorCount = 0;
+        Future<RecordMetadata> future = producer.send(record, (recordMetadata, 
e) -> {
+            if (e != null) {
+                LOG.error("Failed to send message {}", e.getMessage(), e);
+                errorCount++;
+            }
+        });
+
+        sentItems.add(future);
+    }
+
+    @Override
+    public void updateLastOffset(Resumable<K, V> offset) throws Exception {
+        K key = offset.getAddressable();
+        V offsetValue = offset.getLastOffset().offset();
+
+        LOG.debug("Updating offset on Kafka with key {} to {}", key, 
offsetValue);
+
+        produce(topic, key, offsetValue);
+
+        // TODO: this leaks. It must be fixed for merge
+        var entries = processedItems.computeIfAbsent(key, k -> new 
ArrayList<>());
+        entries.add(offsetValue);
+    }
+
+    protected void loadProcessedItems(Map<K, List<V>> processed) throws 
Exception {
+        loadProcessedItems(processed, -1);
+    }
+
+    protected void loadProcessedItems(Map<K, List<V>> processed, int limit) 
throws Exception {
+        subscribe();
+
+        LOG.debug("Loading records from topic {}", topic);
+
+        ConsumerRecords<K, V> records;
+
+        do {
+            records = consume();
+
+            if (records.isEmpty()) {
+                break;
+            }
+
+            for (ConsumerRecord<K, V> record : records) {
+                V value = record.value();
+
+                LOG.trace("Read from Kafka: {}", value);
+                var entries = processed.computeIfAbsent(record.key(), k -> new 
ArrayList<>());
+                entries.add(record.value());
+
+                if (processed.size() >= limit) {
+                    break;
+                }
+            }
+        } while (true);
+
+        LOG.debug("Loaded {} records", processed.size());
+        unsubscribe();
+    }
+
+    // TODO: bad method ...
+    /**
+     * @param topic the topic to consume the messages from
+     */
+    public void checkAndSubscribe(String topic) {
+        if (!subscribed) {
+            consumer.subscribe(Collections.singletonList(topic));
+
+            subscribed = true;
+        }
+    }
+
+    // TODO: bad method ...
+    /**
+     * @param topic the topic to consume the messages from
+     */
+    public void checkAndSubscribe(String topic, long remaining) {
+        if (!subscribed) {
+            consumer.subscribe(Collections.singletonList(topic), new 
ConsumerRebalanceListener() {
+                @Override
+                public void onPartitionsRevoked(Collection<TopicPartition> 
collection) {
+
+                }
+
+                @Override
+                public void onPartitionsAssigned(Collection<TopicPartition> 
assignments) {
+                    consumer.seekToEnd(assignments);
+                    for (TopicPartition assignment : assignments) {
+                        final long endPosition = consumer.position(assignment);
+                        final long startPosition = endPosition - remaining;
+
+                        if (startPosition >= 0) {
+                            consumer.seek(assignment, startPosition);
+                        } else {
+                            LOG.info(
+                                    "Ignoring the seek command because the 
initial offset is negative (the topic is likely empty)");
+                        }
+                    }
+                }
+            });
+
+            subscribed = true;
+        }
+    }
+
+    public abstract void subscribe() throws Exception;
+
+    // TODO: bad method
+    public void unsubscribe() {
+        consumer.unsubscribe();
+    }
+
+    /**
+     * Consumes message from the given topic until the predicate returns false
+     *
+     * @return
+     */
+    public ConsumerRecords<K, V> consume() {
+        int retries = 10;
+
+        return consume(retries);
+    }
+
+    public ConsumerRecords<K, V> consume(int retries) {
+        while (retries > 0) {
+            ConsumerRecords<K, V> records = 
consumer.poll(Duration.ofMillis(100));
+            if (!records.isEmpty()) {
+                return records;
+            }
+            retries--;
+        }
+
+        return ConsumerRecords.empty();
+    }
+
+    protected Map<K, List<V>> getProcessedItems() {
+        return processedItems;
+    }
+
+    public long getErrorCount() {
+        return errorCount;
+    }
+
+    public List<Future<RecordMetadata>> getSentItems() {
+        return Collections.unmodifiableList(sentItems);
+    }
+
+}

Reply via email to