This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6a55b310a0606c290d99bb41d77ce08df35c9c82 Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Wed Feb 23 14:57:44 2022 +0100 CAMEL-15562: abstract the underlying cache so it's configurable --- .../resume/kafka/AbstractKafkaResumeStrategy.java | 40 +++++++---------- .../main/java/org/apache/camel/ResumeCache.java | 52 ++++++++++++++++++++++ 2 files changed, 68 insertions(+), 24 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java index 4fad434..a779c00 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java @@ -23,14 +23,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Properties; -import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.apache.camel.Resumable; +import org.apache.camel.ResumeCache; import org.apache.camel.Service; import org.apache.camel.UpdatableConsumerResumeStrategy; import org.apache.camel.util.StringHelper; @@ -62,24 +61,30 @@ public abstract class AbstractKafkaResumeStrategy<K, V> private Producer<K, V> producer; private long errorCount; - private Map<K, List<V>> processedItems = new TreeMap<>(); - private List<Future<RecordMetadata>> sentItems = new ArrayList<>(); + private final List<Future<RecordMetadata>> sentItems = new ArrayList<>(); + private final ResumeCache<K, V> resumeCache; private boolean subscribed; private Properties producerConfig; private Properties consumerConfig; - public AbstractKafkaResumeStrategy(String bootstrapServers, String topic) { + public AbstractKafkaResumeStrategy(String bootstrapServers, String topic, ResumeCache<K, V> resumeCache) { this.topic = topic; this.producerConfig = createProducer(bootstrapServers); this.consumerConfig = createConsumer(bootstrapServers); + this.resumeCache = resumeCache; + + init(); } - public AbstractKafkaResumeStrategy(String topic, Properties producerConfig, + public AbstractKafkaResumeStrategy(String topic, ResumeCache<K, V> resumeCache, Properties producerConfig, Properties consumerConfig) { this.topic = topic; + this.resumeCache = resumeCache; this.producerConfig = producerConfig; this.consumerConfig = consumerConfig; + + init(); } private Properties createProducer(String bootstrapServers) { @@ -148,22 +153,15 @@ public abstract class AbstractKafkaResumeStrategy<K, V> produce(topic, key, offsetValue); - // TODO: this leaks. It must be fixed for merge - var entries = processedItems.computeIfAbsent(key, k -> new ArrayList<>()); - entries.add(offsetValue); - } - - protected void loadProcessedItems(Map<K, List<V>> processed) throws Exception { - loadProcessedItems(processed, UNLIMITED); + resumeCache.add(key, offsetValue); } - protected void loadProcessedItems(Map<K, List<V>> processed, int limit) throws Exception { + protected void loadCache() throws Exception { subscribe(); LOG.debug("Loading records from topic {}", topic); ConsumerRecords<K, V> records; - do { records = consume(); @@ -175,16 +173,14 @@ public abstract class AbstractKafkaResumeStrategy<K, V> V value = record.value(); LOG.trace("Read from Kafka: {}", value); - var entries = processed.computeIfAbsent(record.key(), k -> new ArrayList<>()); - entries.add(record.value()); + resumeCache.add(record.key(), record.value()); - if (limit != UNLIMITED && processed.size() >= limit) { + if (resumeCache.isFull()) { break; } } } while (true); - LOG.debug("Loaded {} records", processed.size()); unsubscribe(); } @@ -263,10 +259,6 @@ public abstract class AbstractKafkaResumeStrategy<K, V> return ConsumerRecords.empty(); } - protected Map<K, List<V>> getProcessedItems() { - return processedItems; - } - public long getErrorCount() { return errorCount; } @@ -309,7 +301,7 @@ public abstract class AbstractKafkaResumeStrategy<K, V> LOG.info("Starting the kafka resume strategy"); try { - loadProcessedItems(processedItems); + loadCache(); } catch (Exception e) { LOG.error("Failed to load already processed items: {}", e.getMessage(), e); } diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumeCache.java b/core/camel-api/src/main/java/org/apache/camel/ResumeCache.java new file mode 100644 index 0000000..0abe2e6 --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/ResumeCache.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel; + +import java.util.Optional; + +/** + * This cache stored the resumed data from a {@link ResumeStrategy}. + * + * @param <K> the type of the key + * @param <V> the type of the offset value + */ +public interface ResumeCache<K, V> { + + /** + * Adds a value to the cache + * + * @param key the key to add + * @param offsetValue the offset value + */ + void add(K key, V offsetValue); + + /** + * Checks whether the cache is full + * + * @return true if full, or false otherwise + */ + boolean isFull(); + + /** + * Gets the offset value for the key + * + * @param key the key + * @return the key + */ + Optional<V> get(K key); +}
