This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 75a75ca47b0fbea3ef61a7a774b003b9f9800e20 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Fri Jun 17 15:01:45 2022 +0200 CAMEL-18128: provide a way for integrations to configure specific details for the resume strategy --- .../kafka/KafkaResumeStrategyConfiguration.java | 61 +++++++ .../KafkaResumeStrategyConfigurationBuilder.java | 187 +++++++++++++++++++++ .../resume/kafka/MultiNodeKafkaResumeStrategy.java | 52 ++---- .../kafka/SingleNodeKafkaResumeStrategy.java | 124 +++----------- .../camel/resume/ResumeStrategyConfiguration.java | 43 +++++ .../resume/ResumeStrategyConfigurationBuilder.java | 42 +++++ .../BasicResumeStrategyConfigurationBuilder.java | 41 +++++ 7 files changed, 404 insertions(+), 146 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java new file mode 100644 index 00000000000..2e314079223 --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfiguration.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.processor.resume.kafka; + +import java.util.Properties; + +import org.apache.camel.resume.ResumeStrategyConfiguration; + +/** + * A configuration suitable for using with the {@link KafkaResumeStrategy} and any of its implementations + */ +public class KafkaResumeStrategyConfiguration extends ResumeStrategyConfiguration { + private Properties producerProperties; + private Properties consumerProperties; + private String topic; + + public Properties getProducerProperties() { + return producerProperties; + } + + void setProducerProperties(Properties producerProperties) { + assert producerProperties != null; + + this.producerProperties = producerProperties; + } + + public Properties getConsumerProperties() { + return consumerProperties; + } + + void setConsumerProperties(Properties consumerProperties) { + assert consumerProperties != null; + + this.consumerProperties = consumerProperties; + } + + public String getTopic() { + return topic; + } + + void setTopic(String topic) { + assert topic != null; + + this.topic = topic; + } +} diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java new file mode 100644 index 00000000000..150936148d0 --- /dev/null +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategyConfigurationBuilder.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.processor.resume.kafka; + +import java.util.Properties; +import java.util.UUID; + +import org.apache.camel.resume.Cacheable; +import org.apache.camel.support.resume.BasicResumeStrategyConfigurationBuilder; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.StringHelper; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A configuration builder appropriate for building configurations for the {@link SingleNodeKafkaResumeStrategy}, + * {@link MultiNodeKafkaResumeStrategy} and any of their subclasses + */ +public class KafkaResumeStrategyConfigurationBuilder + extends + BasicResumeStrategyConfigurationBuilder<KafkaResumeStrategyConfigurationBuilder, KafkaResumeStrategyConfiguration> { + private static final Logger LOG = LoggerFactory.getLogger(KafkaResumeStrategyConfigurationBuilder.class); + + private Properties producerProperties; + private Properties consumerProperties; + private String topic; + + private KafkaResumeStrategyConfigurationBuilder() { + } + + public KafkaResumeStrategyConfigurationBuilder(Properties producerProperties, Properties consumerProperties) { + this.producerProperties = ObjectHelper.notNull(producerProperties, "producerProperties"); + this.consumerProperties = ObjectHelper.notNull(consumerProperties, "consumerProperties"); + } + + @Override + public KafkaResumeStrategyConfigurationBuilder withCacheFillPolicy(Cacheable.FillPolicy cacheFillPolicy) { + if (cacheFillPolicy == Cacheable.FillPolicy.MINIMIZING) { + consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + } else { + consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + } + + return super.withCacheFillPolicy(cacheFillPolicy); + } + + public KafkaResumeStrategyConfigurationBuilder withProducerProperty(String key, Object value) { + producerProperties.put(key, value); + + return this; + } + + public KafkaResumeStrategyConfigurationBuilder withConsumerProperty(String key, Object value) { + consumerProperties.put(key, value); + + return this; + } + + public KafkaResumeStrategyConfigurationBuilder withGroupId(String value) { + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, value); + + return this; + } + + public KafkaResumeStrategyConfigurationBuilder withEnableAutoCommit(boolean value) { + consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.valueOf(value)); + + return this; + } + + public KafkaResumeStrategyConfigurationBuilder withBootstrapServers(String value) { + final String bootstrapServers = StringHelper.notEmpty(value, "bootstrapServers"); + + consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + + return this; + } + + public KafkaResumeStrategyConfigurationBuilder withTopic(String value) { + this.topic = value; + + return this; + } + + /** + * Creates a basic consumer + * + * @return A set of default properties for consuming byte-based key/pair records from Kafka + */ + public static Properties createConsumerProperties() { + Properties config = new Properties(); + + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + return config; + } + + /** + * Creates a basic producer + * + * @return A set of default properties for producing byte-based key/pair records from Kafka + */ + public static Properties createProducerProperties() { + Properties config = new Properties(); + + config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + + return config; + } + + @Override + public KafkaResumeStrategyConfiguration build() { + KafkaResumeStrategyConfiguration resumeStrategyConfiguration = new KafkaResumeStrategyConfiguration(); + + resumeStrategyConfiguration.setCacheFillPolicy(super.cacheFillPolicy); + resumeStrategyConfiguration.setConsumerProperties(consumerProperties); + resumeStrategyConfiguration.setProducerProperties(producerProperties); + resumeStrategyConfiguration.setTopic(topic); + + return resumeStrategyConfiguration; + } + + /** + * Creates the most basic builder possible + * + * @return a pre-configured basic builder + */ + public static KafkaResumeStrategyConfigurationBuilder newBuilder() { + final Properties producerProperties = KafkaResumeStrategyConfigurationBuilder.createProducerProperties(); + final Properties consumerProperties = KafkaResumeStrategyConfigurationBuilder.createConsumerProperties(); + + KafkaResumeStrategyConfigurationBuilder builder = new KafkaResumeStrategyConfigurationBuilder( + producerProperties, + consumerProperties); + + String groupId = UUID.randomUUID().toString(); + LOG.debug("Creating consumer with {}[{}]", ConsumerConfig.GROUP_ID_CONFIG, groupId); + builder.withGroupId(groupId); + builder.withEnableAutoCommit(true); + builder.withCacheFillPolicy(Cacheable.FillPolicy.MAXIMIZING); + + return builder; + } + + /** + * Creates an empty builder + * + * @return an empty configuration builder + */ + public static KafkaResumeStrategyConfigurationBuilder newEmptyBuilder() { + final Properties producerProperties = new Properties(); + final Properties consumerProperties = new Properties(); + + KafkaResumeStrategyConfigurationBuilder builder = new KafkaResumeStrategyConfigurationBuilder( + producerProperties, + consumerProperties); + + String groupId = UUID.randomUUID().toString(); + LOG.debug("Creating consumer with {}[{}]", ConsumerConfig.GROUP_ID_CONFIG, groupId); + builder.withGroupId(groupId); + builder.withEnableAutoCommit(true); + + return builder; + } +} diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java index 7ece1b23699..c887a21b84b 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java @@ -47,56 +47,26 @@ public class MultiNodeKafkaResumeStrategy<K extends Resumable> extends SingleNod /** * Create a new instance of this class * - * @param bootstrapServers the address of the Kafka broker - * @param topic the topic where to publish the offsets + * @param resumeStrategyConfiguration the configuration to use for this strategy instance */ - public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic) { + public MultiNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration) { // just in case users don't want to provide their own worker thread pool - this(bootstrapServers, topic, Executors.newSingleThreadExecutor()); + this(resumeStrategyConfiguration, Executors.newSingleThreadExecutor()); } /** * Builds an instance of this class * - * @param bootstrapServers the address of the Kafka broker - * @param topic the topic where to publish the offsets - * @param executorService an executor service that will run a separate thread for periodically refreshing the - * offsets + * @param resumeStrategyConfiguration the configuration to use for this strategy instance + * @param executorService an executor service that will run a separate thread for periodically + * refreshing the offsets */ - public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic, ExecutorService executorService) { - super(bootstrapServers, topic); - - // We need to keep refreshing the cache - this.executorService = executorService; - executorService.submit(() -> refresh()); - } - - /** - * Builds an instance of this class - * - * @param topic the topic where to publish the offsets - * @param producerConfig the set of properties to be used by the Kafka producer within this class - * @param consumerConfig the set of properties to be used by the Kafka consumer within this class - */ - public MultiNodeKafkaResumeStrategy(String topic, Properties producerConfig, Properties consumerConfig) { - this(topic, producerConfig, consumerConfig, Executors.newSingleThreadExecutor()); - } - - /** - * Builds an instance of this class - * - * @param topic the topic where to publish the offsets - * @param producerConfig the set of properties to be used by the Kafka producer within this class - * @param consumerConfig the set of properties to be used by the Kafka consumer within this class - * @param executorService an executor service that will run a separate thread for periodically refreshing the - * offsets - */ - - public MultiNodeKafkaResumeStrategy(String topic, Properties producerConfig, Properties consumerConfig, + public MultiNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration, ExecutorService executorService) { - super(topic, producerConfig, consumerConfig); + super(resumeStrategyConfiguration); + // We need to keep refreshing the cache this.executorService = executorService; executorService.submit(() -> refresh()); } @@ -132,11 +102,11 @@ public class MultiNodeKafkaResumeStrategy<K extends Resumable> extends SingleNod protected void refresh() { LOG.trace("Creating a offset cache refresher"); try { - Properties prop = (Properties) getConsumerConfig().clone(); + Properties prop = (Properties) getResumeStrategyConfiguration().getConsumerProperties().clone(); prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); try (Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(prop)) { - consumer.subscribe(Collections.singletonList(getTopic())); + consumer.subscribe(Collections.singletonList(getResumeStrategyConfiguration().getTopic())); poll(consumer); } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java index 4de3f0acc7a..870185127de 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java @@ -23,9 +23,7 @@ import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.Objects; -import java.util.Properties; import java.util.Queue; -import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; @@ -38,21 +36,15 @@ import org.apache.camel.resume.Resumable; import org.apache.camel.resume.ResumeAdapter; import org.apache.camel.resume.cache.ResumeCache; import org.apache.camel.util.IOHelper; -import org.apache.camel.util.ObjectHelper; -import org.apache.camel.util.StringHelper; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.ByteArraySerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,8 +57,6 @@ import org.slf4j.LoggerFactory; public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements KafkaResumeStrategy<T> { private static final Logger LOG = LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class); - private final String topic; - private Consumer<byte[], byte[]> consumer; private Producer<byte[], byte[]> producer; private Duration pollDuration = Duration.ofSeconds(1); @@ -74,77 +64,17 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka private final Queue<RecordError> producerErrors = new ConcurrentLinkedQueue<>(); private boolean subscribed; - private final Properties producerConfig; - private final Properties consumerConfig; private ResumeAdapter adapter; + private final KafkaResumeStrategyConfiguration resumeStrategyConfiguration; /** * Builds an instance of this class * - * @param bootstrapServers the address of the Kafka broker - * @param topic the topic where to publish the offsets + * @param resumeStrategyConfiguration the configuration to use for this strategy instance * */ - public SingleNodeKafkaResumeStrategy(String bootstrapServers, String topic) { - this(topic, createProducer(bootstrapServers), createConsumer(bootstrapServers)); - } - - /** - * Builds an instance of this class - * - * @param topic the topic where to publish the offsets - * @param producerConfig the set of properties to be used by the Kafka producer within this class - * @param consumerConfig the set of properties to be used by the Kafka consumer within this class - */ - public SingleNodeKafkaResumeStrategy(String topic, Properties producerConfig, - Properties consumerConfig) { - this.topic = ObjectHelper.notNull(topic, "The topic must not be null"); - this.producerConfig = producerConfig; - this.consumerConfig = consumerConfig; - - init(); - } - - /** - * Creates a basic string-based producer - * - * @param bootstrapServers the Kafka host - * @return A set of default properties for producing string-based key/pair records from Kafka - */ - public static Properties createProducer(String bootstrapServers) { - Properties config = new Properties(); - - config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - - StringHelper.notEmpty(bootstrapServers, "bootstrapServers"); - config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - - return config; - } - - /** - * Creates a basic string-based consumer - * - * @param bootstrapServers the Kafka host - * @return A set of default properties for consuming string-based key/pair records from Kafka - */ - public static Properties createConsumer(String bootstrapServers) { - Properties config = new Properties(); - - config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - - StringHelper.notEmpty(bootstrapServers, "bootstrapServers"); - config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - - String groupId = UUID.randomUUID().toString(); - LOG.debug("Creating consumer with {}[{}]", ConsumerConfig.GROUP_ID_CONFIG, groupId); - - config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); - config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE.toString()); - - return config; + public SingleNodeKafkaResumeStrategy(KafkaResumeStrategyConfiguration resumeStrategyConfiguration) { + this.resumeStrategyConfiguration = resumeStrategyConfiguration; } /** @@ -158,7 +88,7 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka * */ protected void produce(byte[] key, byte[] message) throws ExecutionException, InterruptedException { - ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, key, message); + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(resumeStrategyConfiguration.getTopic(), key, message); producer.send(record, (recordMetadata, e) -> { if (e != null) { @@ -199,21 +129,11 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka * @throws Exception */ public void loadCache() throws Exception { - if (adapter instanceof Cacheable) { - Cacheable cacheable = (Cacheable) adapter; - - if (cacheable.getFillPolicy() == Cacheable.FillPolicy.MAXIMIZING) { - consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - } else { - consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); - } - } - createConsumer(); subscribe(); - LOG.debug("Loading records from topic {}", topic); + LOG.debug("Loading records from topic {}", resumeStrategyConfiguration.getTopic()); if (!(adapter instanceof Deserializable)) { throw new RuntimeCamelException("Cannot load data for an adapter that is not deserializable"); @@ -311,9 +231,11 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka try { consumer.unsubscribe(); } catch (IllegalStateException e) { - LOG.warn("The consumer is likely already closed. Skipping unsubscribing from {}", topic); + LOG.warn("The consumer is likely already closed. Skipping unsubscribing from {}", + resumeStrategyConfiguration.getTopic()); } catch (Exception e) { - LOG.error("Error unsubscribing from the Kafka topic {}: {}", topic, e.getMessage(), e); + LOG.error("Error unsubscribing from the Kafka topic {}: {}", resumeStrategyConfiguration.getTopic(), e.getMessage(), + e); } } @@ -370,12 +292,12 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka ResumeCache<?> cache = ((Cacheable) adapter).getCache(); if (cache.capacity() >= 1) { - checkAndSubscribe(topic, cache.capacity()); + checkAndSubscribe(resumeStrategyConfiguration.getTopic(), cache.capacity()); } else { - checkAndSubscribe(topic); + checkAndSubscribe(resumeStrategyConfiguration.getTopic()); } } else { - checkAndSubscribe(topic); + checkAndSubscribe(resumeStrategyConfiguration.getTopic()); } } @@ -410,13 +332,13 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka private void createProducer() { if (producer == null) { - producer = new KafkaProducer<>(producerConfig); + producer = new KafkaProducer<>(resumeStrategyConfiguration.getProducerProperties()); } } private void createConsumer() { if (consumer == null) { - consumer = new KafkaConsumer<>(consumerConfig); + consumer = new KafkaConsumer<>(resumeStrategyConfiguration.getConsumerProperties()); } } @@ -455,18 +377,6 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka return producer; } - protected Properties getProducerConfig() { - return producerConfig; - } - - protected Properties getConsumerConfig() { - return consumerConfig; - } - - protected String getTopic() { - return topic; - } - /** * Clear the producer errors */ @@ -474,4 +384,8 @@ public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements Kafka producerErrors.clear(); } + protected KafkaResumeStrategyConfiguration getResumeStrategyConfiguration() { + return resumeStrategyConfiguration; + } + } diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfiguration.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfiguration.java new file mode 100644 index 00000000000..441370e25ed --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfiguration.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.resume; + +/** + * Basic configuration holder for resume strategies + */ +public class ResumeStrategyConfiguration { + private Cacheable.FillPolicy cacheFillPolicy; + + /** + * Gets the {@link org.apache.camel.resume.Cacheable.FillPolicy} for the cache used in the strategy + * + * @return the fill policy to use + */ + public Cacheable.FillPolicy getCacheFillPolicy() { + return cacheFillPolicy; + } + + /** + * Sets the {@link org.apache.camel.resume.Cacheable.FillPolicy} for the cache used in the strategy + * + * @param cacheFillPolicy the fill policy to use + */ + public void setCacheFillPolicy(Cacheable.FillPolicy cacheFillPolicy) { + this.cacheFillPolicy = cacheFillPolicy; + } +} diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfigurationBuilder.java b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfigurationBuilder.java new file mode 100644 index 00000000000..b1992c86221 --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategyConfigurationBuilder.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.resume; + +/** + * Provides a basic interface for implementing component-specific configuration builder + * + * @param <T> The {@link ResumeStrategyConfigurationBuilder} providing the custom configuration + * @param <Y> The type of the {@link ResumeStrategyConfiguration} that will be built by the builder + */ +public interface ResumeStrategyConfigurationBuilder< + T extends ResumeStrategyConfigurationBuilder, Y extends ResumeStrategyConfiguration> { + + /** + * Sets the {@link org.apache.camel.resume.Cacheable.FillPolicy} for the cache used in the strategy + * + * @param cacheFillPolicy the fill policy to use + */ + T withCacheFillPolicy(Cacheable.FillPolicy cacheFillPolicy); + + /** + * Builds the resume strategy configuration + * + * @return a new instance of the resume strategy configuration + */ + Y build(); +} diff --git a/core/camel-support/src/main/java/org/apache/camel/support/resume/BasicResumeStrategyConfigurationBuilder.java b/core/camel-support/src/main/java/org/apache/camel/support/resume/BasicResumeStrategyConfigurationBuilder.java new file mode 100644 index 00000000000..3821ac3142a --- /dev/null +++ b/core/camel-support/src/main/java/org/apache/camel/support/resume/BasicResumeStrategyConfigurationBuilder.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.support.resume; + +import org.apache.camel.resume.Cacheable; +import org.apache.camel.resume.ResumeStrategyConfiguration; +import org.apache.camel.resume.ResumeStrategyConfigurationBuilder; + +/** + * This class implements the most basic configuration set used by all resume strategy builders + * + * @param <T> The {@link ResumeStrategyConfigurationBuilder} providing the custom configuration + * @param <Y> The type of the {@link ResumeStrategyConfiguration} that will be built by the builder + */ +public abstract class BasicResumeStrategyConfigurationBuilder< + T extends BasicResumeStrategyConfigurationBuilder, Y extends ResumeStrategyConfiguration> + implements ResumeStrategyConfigurationBuilder<T, Y> { + protected Cacheable.FillPolicy cacheFillPolicy = Cacheable.FillPolicy.MAXIMIZING; + + @Override + public T withCacheFillPolicy(Cacheable.FillPolicy fillPolicy) { + this.cacheFillPolicy = fillPolicy; + + return (T) this; + } +}