This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 75a75ca47b0fbea3ef61a7a774b003b9f9800e20
Author: Otavio Rodolfo Piske <angusyo...@gmail.com>
AuthorDate: Fri Jun 17 15:01:45 2022 +0200

    CAMEL-18128: provide a way for integrations to configure specific details 
for the resume strategy
---
 .../kafka/KafkaResumeStrategyConfiguration.java    |  61 +++++++
 .../KafkaResumeStrategyConfigurationBuilder.java   | 187 +++++++++++++++++++++
 .../resume/kafka/MultiNodeKafkaResumeStrategy.java |  52 ++----
 .../kafka/SingleNodeKafkaResumeStrategy.java       | 124 +++-----------
 .../camel/resume/ResumeStrategyConfiguration.java  |  43 +++++
 .../resume/ResumeStrategyConfigurationBuilder.java |  42 +++++
 .../BasicResumeStrategyConfigurationBuilder.java   |  41 +++++
 7 files changed, 404 insertions(+), 146 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
new file mode 100644
index 00000000000..2e314079223
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.resume.kafka;
+
+import java.util.Properties;
+
+import org.apache.camel.resume.ResumeStrategyConfiguration;
+
+/**
+ * A configuration suitable for using with the {@link KafkaResumeStrategy} and 
any of its implementations
+ */
+public class KafkaResumeStrategyConfiguration extends 
ResumeStrategyConfiguration {
+    private Properties producerProperties;
+    private Properties consumerProperties;
+    private String topic;
+
+    public Properties getProducerProperties() {
+        return producerProperties;
+    }
+
+    void setProducerProperties(Properties producerProperties) {
+        assert producerProperties != null;
+
+        this.producerProperties = producerProperties;
+    }
+
+    public Properties getConsumerProperties() {
+        return consumerProperties;
+    }
+
+    void setConsumerProperties(Properties consumerProperties) {
+        assert consumerProperties != null;
+
+        this.consumerProperties = consumerProperties;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    void setTopic(String topic) {
+        assert topic != null;
+
+        this.topic = topic;
+    }
+}
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
new file mode 100644
index 00000000000..150936148d0
--- /dev/null
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.resume.kafka;
+
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.camel.resume.Cacheable;
+import org.apache.camel.support.resume.BasicResumeStrategyConfigurationBuilder;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StringHelper;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A configuration builder appropriate for building configurations for the 
{@link SingleNodeKafkaResumeStrategy},
+ * {@link MultiNodeKafkaResumeStrategy} and any of their subclasses
+ */
+public class KafkaResumeStrategyConfigurationBuilder
+        extends
+        
BasicResumeStrategyConfigurationBuilder<KafkaResumeStrategyConfigurationBuilder,
 KafkaResumeStrategyConfiguration> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaResumeStrategyConfigurationBuilder.class);
+
+    private Properties producerProperties;
+    private Properties consumerProperties;
+    private String topic;
+
+    private KafkaResumeStrategyConfigurationBuilder() {
+    }
+
+    public KafkaResumeStrategyConfigurationBuilder(Properties 
producerProperties, Properties consumerProperties) {
+        this.producerProperties = ObjectHelper.notNull(producerProperties, 
"producerProperties");
+        this.consumerProperties = ObjectHelper.notNull(consumerProperties, 
"consumerProperties");
+    }
+
+    @Override
+    public KafkaResumeStrategyConfigurationBuilder 
withCacheFillPolicy(Cacheable.FillPolicy cacheFillPolicy) {
+        if (cacheFillPolicy == Cacheable.FillPolicy.MINIMIZING) {
+            
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"latest");
+        } else {
+            
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+
+        }
+
+        return super.withCacheFillPolicy(cacheFillPolicy);
+    }
+
+    public KafkaResumeStrategyConfigurationBuilder withProducerProperty(String 
key, Object value) {
+        producerProperties.put(key, value);
+
+        return this;
+    }
+
+    public KafkaResumeStrategyConfigurationBuilder withConsumerProperty(String 
key, Object value) {
+        consumerProperties.put(key, value);
+
+        return this;
+    }
+
+    public KafkaResumeStrategyConfigurationBuilder withGroupId(String value) {
+        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, value);
+
+        return this;
+    }
+
+    public KafkaResumeStrategyConfigurationBuilder 
withEnableAutoCommit(boolean value) {
+        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
Boolean.valueOf(value));
+
+        return this;
+    }
+
+    public KafkaResumeStrategyConfigurationBuilder withBootstrapServers(String 
value) {
+        final String bootstrapServers = StringHelper.notEmpty(value, 
"bootstrapServers");
+
+        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+        producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+
+        return this;
+    }
+
+    public KafkaResumeStrategyConfigurationBuilder withTopic(String value) {
+        this.topic = value;
+
+        return this;
+    }
+
+    /**
+     * Creates a basic consumer
+     *
+     * @return A set of default properties for consuming byte-based key/pair 
records from Kafka
+     */
+    public static Properties createConsumerProperties() {
+        Properties config = new Properties();
+
+        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        return config;
+    }
+
+    /**
+     * Creates a basic producer
+     *
+     * @return A set of default properties for producing byte-based key/pair 
records from Kafka
+     */
+    public static Properties createProducerProperties() {
+        Properties config = new Properties();
+
+        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+
+        return config;
+    }
+
+    @Override
+    public KafkaResumeStrategyConfiguration build() {
+        KafkaResumeStrategyConfiguration resumeStrategyConfiguration = new 
KafkaResumeStrategyConfiguration();
+
+        resumeStrategyConfiguration.setCacheFillPolicy(super.cacheFillPolicy);
+        resumeStrategyConfiguration.setConsumerProperties(consumerProperties);
+        resumeStrategyConfiguration.setProducerProperties(producerProperties);
+        resumeStrategyConfiguration.setTopic(topic);
+
+        return resumeStrategyConfiguration;
+    }
+
+    /**
+     * Creates the most basic builder possible
+     * 
+     * @return a pre-configured basic builder
+     */
+    public static KafkaResumeStrategyConfigurationBuilder newBuilder() {
+        final Properties producerProperties = 
KafkaResumeStrategyConfigurationBuilder.createProducerProperties();
+        final Properties consumerProperties = 
KafkaResumeStrategyConfigurationBuilder.createConsumerProperties();
+
+        KafkaResumeStrategyConfigurationBuilder builder = new 
KafkaResumeStrategyConfigurationBuilder(
+                producerProperties,
+                consumerProperties);
+
+        String groupId = UUID.randomUUID().toString();
+        LOG.debug("Creating consumer with {}[{}]", 
ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        builder.withGroupId(groupId);
+        builder.withEnableAutoCommit(true);
+        builder.withCacheFillPolicy(Cacheable.FillPolicy.MAXIMIZING);
+
+        return builder;
+    }
+
+    /**
+     * Creates an empty builder
+     * 
+     * @return an empty configuration builder
+     */
+    public static KafkaResumeStrategyConfigurationBuilder newEmptyBuilder() {
+        final Properties producerProperties = new Properties();
+        final Properties consumerProperties = new Properties();
+
+        KafkaResumeStrategyConfigurationBuilder builder = new 
KafkaResumeStrategyConfigurationBuilder(
+                producerProperties,
+                consumerProperties);
+
+        String groupId = UUID.randomUUID().toString();
+        LOG.debug("Creating consumer with {}[{}]", 
ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        builder.withGroupId(groupId);
+        builder.withEnableAutoCommit(true);
+
+        return builder;
+    }
+}
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
index 7ece1b23699..c887a21b84b 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
@@ -47,56 +47,26 @@ public class MultiNodeKafkaResumeStrategy<K extends 
Resumable> extends SingleNod
     /**
      * Create a new instance of this class
      * 
-     * @param bootstrapServers the address of the Kafka broker
-     * @param topic            the topic where to publish the offsets
+     * @param resumeStrategyConfiguration the configuration to use for this 
strategy instance
      */
-    public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic) 
{
+    public MultiNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration 
resumeStrategyConfiguration) {
         // just in case users don't want to provide their own worker thread 
pool
-        this(bootstrapServers, topic, Executors.newSingleThreadExecutor());
+        this(resumeStrategyConfiguration, Executors.newSingleThreadExecutor());
     }
 
     /**
      * Builds an instance of this class
      *
-     * @param bootstrapServers the address of the Kafka broker
-     * @param topic            the topic where to publish the offsets
-     * @param executorService  an executor service that will run a separate 
thread for periodically refreshing the
-     *                         offsets
+     * @param resumeStrategyConfiguration the configuration to use for this 
strategy instance
+     * @param executorService             an executor service that will run a 
separate thread for periodically
+     *                                    refreshing the offsets
      */
 
-    public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic, 
ExecutorService executorService) {
-        super(bootstrapServers, topic);
-
-        // We need to keep refreshing the cache
-        this.executorService = executorService;
-        executorService.submit(() -> refresh());
-    }
-
-    /**
-     * Builds an instance of this class
-     *
-     * @param topic          the topic where to publish the offsets
-     * @param producerConfig the set of properties to be used by the Kafka 
producer within this class
-     * @param consumerConfig the set of properties to be used by the Kafka 
consumer within this class
-     */
-    public MultiNodeKafkaResumeStrategy(String topic, Properties 
producerConfig, Properties consumerConfig) {
-        this(topic, producerConfig, consumerConfig, 
Executors.newSingleThreadExecutor());
-    }
-
-    /**
-     * Builds an instance of this class
-     *
-     * @param topic           the topic where to publish the offsets
-     * @param producerConfig  the set of properties to be used by the Kafka 
producer within this class
-     * @param consumerConfig  the set of properties to be used by the Kafka 
consumer within this class
-     * @param executorService an executor service that will run a separate 
thread for periodically refreshing the
-     *                        offsets
-     */
-
-    public MultiNodeKafkaResumeStrategy(String topic, Properties 
producerConfig, Properties consumerConfig,
+    public MultiNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration 
resumeStrategyConfiguration,
                                         ExecutorService executorService) {
-        super(topic, producerConfig, consumerConfig);
+        super(resumeStrategyConfiguration);
 
+        // We need to keep refreshing the cache
         this.executorService = executorService;
         executorService.submit(() -> refresh());
     }
@@ -132,11 +102,11 @@ public class MultiNodeKafkaResumeStrategy<K extends 
Resumable> extends SingleNod
     protected void refresh() {
         LOG.trace("Creating a offset cache refresher");
         try {
-            Properties prop = (Properties) getConsumerConfig().clone();
+            Properties prop = (Properties) 
getResumeStrategyConfiguration().getConsumerProperties().clone();
             prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
 
             try (Consumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(prop)) {
-                consumer.subscribe(Collections.singletonList(getTopic()));
+                
consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic()));
 
                 poll(consumer);
             }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 4de3f0acc7a..870185127de 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -23,9 +23,7 @@ import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Objects;
-import java.util.Properties;
 import java.util.Queue;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 
@@ -38,21 +36,15 @@ import org.apache.camel.resume.Resumable;
 import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.cache.ResumeCache;
 import org.apache.camel.util.IOHelper;
-import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.StringHelper;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,8 +57,6 @@ import org.slf4j.LoggerFactory;
 public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements 
KafkaResumeStrategy<T> {
     private static final Logger LOG = 
LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class);
 
-    private final String topic;
-
     private Consumer<byte[], byte[]> consumer;
     private Producer<byte[], byte[]> producer;
     private Duration pollDuration = Duration.ofSeconds(1);
@@ -74,77 +64,17 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
     private final Queue<RecordError> producerErrors = new 
ConcurrentLinkedQueue<>();
 
     private boolean subscribed;
-    private final Properties producerConfig;
-    private final Properties consumerConfig;
     private ResumeAdapter adapter;
+    private final KafkaResumeStrategyConfiguration resumeStrategyConfiguration;
 
     /**
      * Builds an instance of this class
      * 
-     * @param bootstrapServers the address of the Kafka broker
-     * @param topic            the topic where to publish the offsets
+     * @param resumeStrategyConfiguration the configuration to use for this 
strategy instance
      *
      */
-    public SingleNodeKafkaResumeStrategy(String bootstrapServers, String 
topic) {
-        this(topic, createProducer(bootstrapServers), 
createConsumer(bootstrapServers));
-    }
-
-    /**
-     * Builds an instance of this class
-     *
-     * @param topic          the topic where to publish the offsets
-     * @param producerConfig the set of properties to be used by the Kafka 
producer within this class
-     * @param consumerConfig the set of properties to be used by the Kafka 
consumer within this class
-     */
-    public SingleNodeKafkaResumeStrategy(String topic, Properties 
producerConfig,
-                                         Properties consumerConfig) {
-        this.topic = ObjectHelper.notNull(topic, "The topic must not be null");
-        this.producerConfig = producerConfig;
-        this.consumerConfig = consumerConfig;
-
-        init();
-    }
-
-    /**
-     * Creates a basic string-based producer
-     * 
-     * @param  bootstrapServers the Kafka host
-     * @return                  A set of default properties for producing 
string-based key/pair records from Kafka
-     */
-    public static Properties createProducer(String bootstrapServers) {
-        Properties config = new Properties();
-
-        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
-        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
-
-        StringHelper.notEmpty(bootstrapServers, "bootstrapServers");
-        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-
-        return config;
-    }
-
-    /**
-     * Creates a basic string-based consumer
-     * 
-     * @param  bootstrapServers the Kafka host
-     * @return                  A set of default properties for consuming 
string-based key/pair records from Kafka
-     */
-    public static Properties createConsumer(String bootstrapServers) {
-        Properties config = new Properties();
-
-        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
-        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
-
-        StringHelper.notEmpty(bootstrapServers, "bootstrapServers");
-        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-
-        String groupId = UUID.randomUUID().toString();
-        LOG.debug("Creating consumer with {}[{}]", 
ConsumerConfig.GROUP_ID_CONFIG, groupId);
-
-        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
Boolean.TRUE.toString());
-
-        return config;
+    public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration 
resumeStrategyConfiguration) {
+        this.resumeStrategyConfiguration = resumeStrategyConfiguration;
     }
 
     /**
@@ -158,7 +88,7 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
      *
      */
     protected void produce(byte[] key, byte[] message) throws 
ExecutionException, InterruptedException {
-        ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, 
key, message);
+        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(resumeStrategyConfiguration.getTopic(), key, message);
 
         producer.send(record, (recordMetadata, e) -> {
             if (e != null) {
@@ -199,21 +129,11 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
      * @throws Exception
      */
     public void loadCache() throws Exception {
-        if (adapter instanceof Cacheable) {
-            Cacheable cacheable = (Cacheable) adapter;
-
-            if (cacheable.getFillPolicy() == Cacheable.FillPolicy.MAXIMIZING) {
-                
consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-            } else {
-                
consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
-            }
-        }
-
         createConsumer();
 
         subscribe();
 
-        LOG.debug("Loading records from topic {}", topic);
+        LOG.debug("Loading records from topic {}", 
resumeStrategyConfiguration.getTopic());
 
         if (!(adapter instanceof Deserializable)) {
             throw new RuntimeCamelException("Cannot load data for an adapter 
that is not deserializable");
@@ -311,9 +231,11 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
         try {
             consumer.unsubscribe();
         } catch (IllegalStateException e) {
-            LOG.warn("The consumer is likely already closed. Skipping 
unsubscribing from {}", topic);
+            LOG.warn("The consumer is likely already closed. Skipping 
unsubscribing from {}",
+                    resumeStrategyConfiguration.getTopic());
         } catch (Exception e) {
-            LOG.error("Error unsubscribing from the Kafka topic {}: {}", 
topic, e.getMessage(), e);
+            LOG.error("Error unsubscribing from the Kafka topic {}: {}", 
resumeStrategyConfiguration.getTopic(), e.getMessage(),
+                    e);
         }
     }
 
@@ -370,12 +292,12 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
             ResumeCache<?> cache = ((Cacheable) adapter).getCache();
 
             if (cache.capacity() >= 1) {
-                checkAndSubscribe(topic, cache.capacity());
+                checkAndSubscribe(resumeStrategyConfiguration.getTopic(), 
cache.capacity());
             } else {
-                checkAndSubscribe(topic);
+                checkAndSubscribe(resumeStrategyConfiguration.getTopic());
             }
         } else {
-            checkAndSubscribe(topic);
+            checkAndSubscribe(resumeStrategyConfiguration.getTopic());
         }
     }
 
@@ -410,13 +332,13 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
 
     private void createProducer() {
         if (producer == null) {
-            producer = new KafkaProducer<>(producerConfig);
+            producer = new 
KafkaProducer<>(resumeStrategyConfiguration.getProducerProperties());
         }
     }
 
     private void createConsumer() {
         if (consumer == null) {
-            consumer = new KafkaConsumer<>(consumerConfig);
+            consumer = new 
KafkaConsumer<>(resumeStrategyConfiguration.getConsumerProperties());
         }
     }
 
@@ -455,18 +377,6 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
         return producer;
     }
 
-    protected Properties getProducerConfig() {
-        return producerConfig;
-    }
-
-    protected Properties getConsumerConfig() {
-        return consumerConfig;
-    }
-
-    protected String getTopic() {
-        return topic;
-    }
-
     /**
      * Clear the producer errors
      */
@@ -474,4 +384,8 @@ public class SingleNodeKafkaResumeStrategy<T extends 
Resumable> implements Kafka
         producerErrors.clear();
     }
 
+    protected KafkaResumeStrategyConfiguration 
getResumeStrategyConfiguration() {
+        return resumeStrategyConfiguration;
+    }
+
 }
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfiguration.java
 
b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfiguration.java
new file mode 100644
index 00000000000..441370e25ed
--- /dev/null
+++ 
b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfiguration.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.resume;
+
+/**
+ * Basic configuration holder for resume strategies
+ */
+public class ResumeStrategyConfiguration {
+    private Cacheable.FillPolicy cacheFillPolicy;
+
+    /**
+     * Gets the {@link org.apache.camel.resume.Cacheable.FillPolicy} for the 
cache used in the strategy
+     *
+     * @return the fill policy to use
+     */
+    public Cacheable.FillPolicy getCacheFillPolicy() {
+        return cacheFillPolicy;
+    }
+
+    /**
+     * Sets the {@link org.apache.camel.resume.Cacheable.FillPolicy} for the 
cache used in the strategy
+     *
+     * @param cacheFillPolicy the fill policy to use
+     */
+    public void setCacheFillPolicy(Cacheable.FillPolicy cacheFillPolicy) {
+        this.cacheFillPolicy = cacheFillPolicy;
+    }
+}
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfigurationBuilder.java
 
b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfigurationBuilder.java
new file mode 100644
index 00000000000..b1992c86221
--- /dev/null
+++ 
b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfigurationBuilder.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.resume;
+
+/**
+ * Provides a basic interface for implementing component-specific 
configuration builder
+ *
+ * @param <T> The {@link ResumeStrategyConfigurationBuilder} providing the 
custom configuration
+ * @param <Y> The type of the {@link ResumeStrategyConfiguration} that will be 
built by the builder
+ */
+public interface ResumeStrategyConfigurationBuilder<
+        T extends ResumeStrategyConfigurationBuilder, Y extends 
ResumeStrategyConfiguration> {
+
+    /**
+     * Sets the {@link org.apache.camel.resume.Cacheable.FillPolicy} for the 
cache used in the strategy
+     *
+     * @param cacheFillPolicy the fill policy to use
+     */
+    T withCacheFillPolicy(Cacheable.FillPolicy cacheFillPolicy);
+
+    /**
+     * Builds the resume strategy configuration
+     *
+     * @return a new instance of the resume strategy configuration
+     */
+    Y build();
+}
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/resume/BasicResumeStrategyConfigurationBuilder.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/resume/BasicResumeStrategyConfigurationBuilder.java
new file mode 100644
index 00000000000..3821ac3142a
--- /dev/null
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/resume/BasicResumeStrategyConfigurationBuilder.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.support.resume;
+
+import org.apache.camel.resume.Cacheable;
+import org.apache.camel.resume.ResumeStrategyConfiguration;
+import org.apache.camel.resume.ResumeStrategyConfigurationBuilder;
+
+/**
+ * This class implements the most basic configuration set used by all resume 
strategy builders
+ *
+ * @param <T> The {@link ResumeStrategyConfigurationBuilder} providing the 
custom configuration
+ * @param <Y> The type of the {@link ResumeStrategyConfiguration} that will be 
built by the builder
+ */
+public abstract class BasicResumeStrategyConfigurationBuilder<
+        T extends BasicResumeStrategyConfigurationBuilder, Y extends 
ResumeStrategyConfiguration>
+        implements ResumeStrategyConfigurationBuilder<T, Y> {
+    protected Cacheable.FillPolicy cacheFillPolicy = 
Cacheable.FillPolicy.MAXIMIZING;
+
+    @Override
+    public T withCacheFillPolicy(Cacheable.FillPolicy fillPolicy) {
+        this.cacheFillPolicy = fillPolicy;
+
+        return (T) this;
+    }
+}

Reply via email to