This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9aab52d4d37c20a254702527428f67e9f1a7b46b Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Wed Feb 2 17:00:49 2022 +0100 CAMEL-15562: preliminary implementation of the user-facing resume API --- .../apache/camel/catalog/schemas/camel-spring.xsd | 101 +++++++++ .../kinesis/consumer/KinesisResumeStrategy.java | 2 +- .../KinesisUserConfigurationResumeStrategy.java | 5 + .../couchdb/consumer/CouchDbResumeStrategy.java | 2 +- .../LatestUpdateSequenceResumeStrategy.java | 5 + .../camel/component/kafka/KafkaFetchRecords.java | 6 +- .../support/KafkaConsumerResumeStrategy.java | 2 +- .../support/OffsetKafkaConsumerResumeStrategy.java | 5 + .../support/PartitionAssignmentListener.java | 13 +- .../consumer/support/ResumeStrategyFactory.java | 5 + .../SeekPolicyKafkaConsumerResumeStrategy.java | 5 + .../resume/kafka/AbstractKafkaResumeStrategy.java | 58 ++++- .../KafkaConsumerWithResumeRouteStrategyIT.java | 239 +++++++++++++++++++++ .../KafkaConsumerWithResumeStrategyIT.java | 5 + .../src/test/resources/log4j2.properties | 5 +- .../main/java/org/apache/camel/ResumeAware.java | 10 +- .../main/java/org/apache/camel/ResumeStrategy.java | 9 +- .../src/main/java/org/apache/camel/Route.java | 5 + .../org/apache/camel/impl/engine/DefaultRoute.java | 11 + .../apache/camel/model/ProcessorDefinition.java | 13 ++ .../apache/camel/model/ResumableDefinition.java | 60 ++++++ .../processor/resume/ResumableCompletion.java | 61 ++++++ .../camel/processor/resume/ResumableProcessor.java | 151 +++++++++++++ .../org/apache/camel/reifier/ProcessorReifier.java | 3 + .../org/apache/camel/reifier/ResumableReifier.java | 57 +++++ .../FileConsumerResumeFromOffsetStrategyTest.java | 5 + .../file/FileConsumerResumeStrategyTest.java | 5 + 27 files changed, 820 insertions(+), 28 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd index f397503..2316c1d 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd @@ -10510,6 +10510,107 @@ Reference to the rest-dsl. </xs:attribute> </xs:complexType> + <xs:complexType name="resumableDefinition"> + <xs:complexContent> + <xs:extension base="tns:processorDefinition"> + <xs:sequence> + <xs:choice> + <xs:element ref="tns:expressionDefinition"/> + <xs:element ref="tns:csimple"/> + <xs:element ref="tns:constant"/> + <xs:element ref="tns:datasonnet"/> + <xs:element ref="tns:exchangeProperty"/> + <xs:element ref="tns:groovy"/> + <xs:element ref="tns:header"/> + <xs:element ref="tns:hl7terser"/> + <xs:element ref="tns:joor"/> + <xs:element ref="tns:jsonpath"/> + <xs:element ref="tns:language"/> + <xs:element ref="tns:method"/> + <xs:element ref="tns:mvel"/> + <xs:element ref="tns:ognl"/> + <xs:element ref="tns:ref"/> + <xs:element ref="tns:simple"/> + <xs:element ref="tns:spel"/> + <xs:element ref="tns:tokenize"/> + <xs:element ref="tns:xtokenize"/> + <xs:element ref="tns:xpath"/> + <xs:element ref="tns:xquery"/> + </xs:choice> + <xs:choice maxOccurs="unbounded" minOccurs="0"> + <xs:element ref="tns:aggregate"/> + <xs:element ref="tns:bean"/> + <xs:element ref="tns:doCatch"/> + <xs:element ref="tns:when"/> + <xs:element ref="tns:choice"/> + <xs:element ref="tns:otherwise"/> + <xs:element ref="tns:circuitBreaker"/> + <xs:element ref="tns:claimCheck"/> + <xs:element ref="tns:convertBodyTo"/> + <xs:element ref="tns:delay"/> + <xs:element ref="tns:dynamicRouter"/> + <xs:element ref="tns:enrich"/> + <xs:element ref="tns:filter"/> + <xs:element ref="tns:doFinally"/> + <xs:element ref="tns:idempotentConsumer"/> + <xs:element ref="tns:inOnly"/> + <xs:element ref="tns:inOut"/> + <xs:element ref="tns:intercept"/> + <xs:element ref="tns:interceptFrom"/> + <xs:element ref="tns:interceptSendToEndpoint"/> + <xs:element ref="tns:kamelet"/> + <xs:element ref="tns:loadBalance"/> + <xs:element ref="tns:log"/> + <xs:element ref="tns:loop"/> + <xs:element ref="tns:marshal"/> + <xs:element ref="tns:multicast"/> + <xs:element ref="tns:onCompletion"/> + <xs:element ref="tns:onException"/> + <xs:element ref="tns:onFallback"/> + <xs:element ref="tns:pipeline"/> + <xs:element ref="tns:policy"/> + <xs:element ref="tns:pollEnrich"/> + <xs:element ref="tns:process"/> + <xs:element ref="tns:recipientList"/> + <xs:element ref="tns:removeHeader"/> + <xs:element ref="tns:removeHeaders"/> + <xs:element ref="tns:removeProperties"/> + <xs:element ref="tns:removeProperty"/> + <xs:element ref="tns:resequence"/> + <xs:element ref="tns:rollback"/> + <xs:element ref="tns:route"/> + <xs:element ref="tns:routingSlip"/> + <xs:element ref="tns:saga"/> + <xs:element ref="tns:sample"/> + <xs:element ref="tns:script"/> + <xs:element ref="tns:setBody"/> + <xs:element ref="tns:setExchangePattern"/> + <xs:element ref="tns:setHeader"/> + <xs:element ref="tns:setProperty"/> + <xs:element ref="tns:sort"/> + <xs:element ref="tns:split"/> + <xs:element ref="tns:step"/> + <xs:element ref="tns:stop"/> + <xs:element ref="tns:doSwitch"/> + <xs:element ref="tns:threads"/> + <xs:element ref="tns:throttle"/> + <xs:element ref="tns:throwException"/> + <xs:element ref="tns:to"/> + <xs:element ref="tns:toD"/> + <xs:element ref="tns:transacted"/> + <xs:element ref="tns:transform"/> + <xs:element ref="tns:doTry"/> + <xs:element ref="tns:unmarshal"/> + <xs:element ref="tns:validate"/> + <xs:element ref="tns:wireTap"/> + <xs:element ref="tns:serviceCall"/> + </xs:choice> + </xs:sequence> + <xs:attribute name="resumeStrategyRef" type="xs:string" use="required"/> + </xs:extension> + </xs:complexContent> + </xs:complexType> + <xs:complexType name="rollbackDefinition"> <xs:complexContent> <xs:extension base="tns:noOutputDefinition"> diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java index b6a000d..6c3ae6f 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java @@ -20,7 +20,7 @@ package org.apache.camel.component.aws2.kinesis.consumer; import org.apache.camel.ResumeStrategy; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; -public interface KinesisResumeStrategy extends ResumeStrategy<GetShardIteratorRequest.Builder> { +public interface KinesisResumeStrategy extends ResumeStrategy { void setRequestBuilder(GetShardIteratorRequest.Builder builder); } diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java index 5633443..e5c92fc 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java @@ -46,4 +46,9 @@ public class KinesisUserConfigurationResumeStrategy implements KinesisResumeStra resumable.startingSequenceNumber(configuration.getSequenceNumber()); } } + + @Override + public void start() throws Exception { + + } } diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java index b03b31d..14d87d4 100644 --- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java +++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java @@ -22,6 +22,6 @@ import org.apache.camel.ResumeStrategy; /** * Defines a resumable strategy usable by the CouchDB component */ -public interface CouchDbResumeStrategy extends ResumeStrategy<CouchDbResumable> { +public interface CouchDbResumeStrategy extends ResumeStrategy { void setResumable(CouchDbResumable resumable); } diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeStrategy.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeStrategy.java index 277d842..04679d8 100644 --- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeStrategy.java +++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeStrategy.java @@ -36,4 +36,9 @@ public final class LatestUpdateSequenceResumeStrategy implements CouchDbResumeSt resumable.updateLastOffset(clientWrapper.getLatestUpdateSequence()); } + + @Override + public void start() throws Exception { + + } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index ed49fdf..deeef3c 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -28,6 +28,7 @@ import java.util.regex.Pattern; import org.apache.camel.component.kafka.consumer.CommitManager; import org.apache.camel.component.kafka.consumer.CommitManagers; +import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy; import org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade; import org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener; import org.apache.camel.component.kafka.consumer.support.ProcessingResult; @@ -136,9 +137,12 @@ class KafkaFetchRecords implements Runnable { } private void subscribe() { + KafkaConsumerResumeStrategy userProvidedStrategy + = kafkaConsumer.getEndpoint().getCamelContext().hasService(KafkaConsumerResumeStrategy.class); + PartitionAssignmentListener listener = new PartitionAssignmentListener( threadId, kafkaConsumer.getEndpoint().getConfiguration(), consumer, lastProcessedOffset, - this::isRunnable, commitManager); + this::isRunnable, commitManager, userProvidedStrategy); if (LOG.isInfoEnabled()) { LOG.info("Subscribing {} to {}", threadId, getPrintableTopic()); diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java index c433745..35eb879 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java @@ -27,7 +27,7 @@ import org.apache.kafka.clients.consumer.Consumer; * the component is set up to use more than one of them. As such, implementations are responsible for ensuring the * thread-safety of the operations within the resume method. */ -public interface KafkaConsumerResumeStrategy extends ResumeStrategy<KafkaResumable> { +public interface KafkaConsumerResumeStrategy extends ResumeStrategy { void setConsumer(Consumer<?, ?> consumer); default void resume(KafkaResumable resumable) { diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java index 4502831..3875184 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java @@ -73,4 +73,9 @@ public class OffsetKafkaConsumerResumeStrategy implements KafkaConsumerResumeStr public void resume(KafkaResumable resumable) { resume(); } + + @Override + public void start() throws Exception { + + } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java index efedbe5..fe19147 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java @@ -38,7 +38,6 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener { private final String threadId; private final KafkaConfiguration configuration; - private final Consumer consumer; private final Map<String, Long> lastProcessedOffset; private final KafkaConsumerResumeStrategy resumeStrategy; private final CommitManager commitManager; @@ -46,15 +45,21 @@ public class PartitionAssignmentListener implements ConsumerRebalanceListener { public PartitionAssignmentListener(String threadId, KafkaConfiguration configuration, Consumer consumer, Map<String, Long> lastProcessedOffset, - Supplier<Boolean> stopStateSupplier, CommitManager commitManager) { + Supplier<Boolean> stopStateSupplier, CommitManager commitManager, + KafkaConsumerResumeStrategy resumeStrategy) { this.threadId = threadId; this.configuration = configuration; - this.consumer = consumer; this.lastProcessedOffset = lastProcessedOffset; this.commitManager = commitManager; this.stopStateSupplier = stopStateSupplier; - this.resumeStrategy = ResumeStrategyFactory.newResumeStrategy(configuration); + if (resumeStrategy == null) { + LOG.info("No resume strategy was provided ... checking for builtins ..."); + this.resumeStrategy = ResumeStrategyFactory.newResumeStrategy(configuration); + } else { + LOG.info("Using user-provided strategy"); + this.resumeStrategy = resumeStrategy; + } resumeStrategy.setConsumer(consumer); } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java index 3465ef5..67707de 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java @@ -40,6 +40,11 @@ public final class ResumeStrategyFactory { public void resume() { } + + @Override + public void start() throws Exception { + + } } private static final NoOpKafkaConsumerResumeStrategy NO_OP_RESUME_STRATEGY = new NoOpKafkaConsumerResumeStrategy(); diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java index 91db91b..46fd974 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java @@ -58,4 +58,9 @@ public class SeekPolicyKafkaConsumerResumeStrategy implements KafkaConsumerResum public void resume(KafkaResumable resumable) { resume(); } + + @Override + public void start() throws Exception { + + } } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java index c8bd037..4fad434 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java @@ -17,6 +17,7 @@ package org.apache.camel.processor.resume.kafka; +import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; @@ -30,6 +31,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.apache.camel.Resumable; +import org.apache.camel.Service; import org.apache.camel.UpdatableConsumerResumeStrategy; import org.apache.camel.util.StringHelper; import org.apache.kafka.clients.consumer.Consumer; @@ -49,7 +51,9 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class AbstractKafkaResumeStrategy<K, V> implements UpdatableConsumerResumeStrategy<K, V, Resumable<K, V>> { +public abstract class AbstractKafkaResumeStrategy<K, V> + implements UpdatableConsumerResumeStrategy<K, V, Resumable<K, V>>, Service { + public static final int UNLIMITED = -1; private static final Logger LOG = LoggerFactory.getLogger(AbstractKafkaResumeStrategy.class); private final String topic; @@ -71,20 +75,13 @@ public abstract class AbstractKafkaResumeStrategy<K, V> implements UpdatableCons this.consumerConfig = createConsumer(bootstrapServers); } - public AbstractKafkaResumeStrategy(String bootstrapServers, String topic, Properties producerConfig, + public AbstractKafkaResumeStrategy(String topic, Properties producerConfig, Properties consumerConfig) { this.topic = topic; this.producerConfig = producerConfig; this.consumerConfig = consumerConfig; } - public void start() throws Exception { - consumer = new KafkaConsumer<>(consumerConfig); - producer = new KafkaProducer<>(producerConfig); - - loadProcessedItems(processedItems); - } - private Properties createProducer(String bootstrapServers) { Properties config = new Properties(); @@ -157,7 +154,7 @@ public abstract class AbstractKafkaResumeStrategy<K, V> implements UpdatableCons } protected void loadProcessedItems(Map<K, List<V>> processed) throws Exception { - loadProcessedItems(processed, -1); + loadProcessedItems(processed, UNLIMITED); } protected void loadProcessedItems(Map<K, List<V>> processed, int limit) throws Exception { @@ -181,7 +178,7 @@ public abstract class AbstractKafkaResumeStrategy<K, V> implements UpdatableCons var entries = processed.computeIfAbsent(record.key(), k -> new ArrayList<>()); entries.add(record.value()); - if (processed.size() >= limit) { + if (limit != UNLIMITED && processed.size() >= limit) { break; } } @@ -278,4 +275,43 @@ public abstract class AbstractKafkaResumeStrategy<K, V> implements UpdatableCons return Collections.unmodifiableList(sentItems); } + @Override + public void build() { + Service.super.build(); + } + + @Override + public void init() { + Service.super.init(); + + LOG.debug("Initializing the Kafka resume strategy"); + if (consumer == null) { + consumer = new KafkaConsumer<>(consumerConfig); + } + + if (producer == null) { + producer = new KafkaProducer<>(producerConfig); + } + } + + @Override + public void stop() { + + } + + @Override + public void close() throws IOException { + Service.super.close(); + } + + @Override + public void start() { + LOG.info("Starting the kafka resume strategy"); + + try { + loadProcessedItems(processedItems); + } catch (Exception e) { + LOG.error("Failed to load already processed items: {}", e.getMessage(), e); + } + } } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java new file mode 100644 index 0000000..5b0e828 --- /dev/null +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.integration; + +import java.util.Collections; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.BindToRegistry; +import org.apache.camel.EndpointInject; +import org.apache.camel.Offset; +import org.apache.camel.Resumable; +import org.apache.camel.Service; +import org.apache.camel.UpdatableConsumerResumeStrategy; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy; +import org.apache.camel.component.kafka.consumer.support.KafkaResumable; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.resume.Resumables; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerWithResumeRouteStrategyIT.class); + + private static final String TOPIC = "resumable-route-tp"; + + @EndpointInject("mock:result") + private MockEndpoint result; + + @BindToRegistry("resumeStrategy") + private TestKafkaConsumerResumeStrategy resumeStrategy; + private CountDownLatch messagesLatch; + private static final int RANDOM_VALUE = ThreadLocalRandom.current().nextInt(1, 1000); + private KafkaProducer<Object, Object> producer; + + private static class TestKafkaConsumerResumeStrategy + implements KafkaConsumerResumeStrategy, + UpdatableConsumerResumeStrategy<String, Integer, Resumable<String, Integer>>, Service { + private final CountDownLatch messagesLatch; + private boolean resumeCalled; + private boolean consumerIsNull = true; + private boolean startCalled; + private boolean offsetNull = true; + private boolean offsetAddressableNull = true; + private boolean offsetAddressableEmpty = true; + private boolean offsetValueNull = true; + private boolean offsetValueEmpty = true; + private int lastOffset; + + public TestKafkaConsumerResumeStrategy(CountDownLatch messagesLatch) { + this.messagesLatch = messagesLatch; + } + + @Override + public void setConsumer(Consumer<?, ?> consumer) { + if (consumer != null) { + consumerIsNull = false; + } + } + + @Override + public void resume(KafkaResumable resumable) { + resumeCalled = true; + + } + + @Override + public void resume() { + resumeCalled = true; + } + + public boolean isResumeCalled() { + return resumeCalled; + } + + public boolean isConsumerIsNull() { + return consumerIsNull; + } + + @Override + public void start() { + LOG.warn("Start was called"); + startCalled = true; + } + + @Override + public void stop() { + + } + + @Override + public void init() { + Service.super.init(); + LOG.warn("Init was called"); + } + + public boolean isStartCalled() { + return startCalled; + } + + @Override + public void updateLastOffset(Resumable<String, Integer> offset) { + try { + if (offset != null) { + offsetNull = false; + + String addressable = offset.getAddressable(); + if (addressable != null) { + offsetAddressableNull = false; + offsetAddressableEmpty = addressable.isEmpty() || addressable.isBlank(); + + } + + Offset<Integer> offsetValue = offset.getLastOffset(); + if (offsetValue != null) { + offsetValueNull = false; + + if (offsetValue.offset() != null) { + offsetValueEmpty = false; + lastOffset = offsetValue.offset(); + } + } + } + } finally { + messagesLatch.countDown(); + } + } + + public boolean isOffsetNull() { + return offsetNull; + } + + public boolean isOffsetAddressableNull() { + return offsetAddressableNull; + } + + public boolean isOffsetValueNull() { + return offsetValueNull; + } + + public boolean isOffsetAddressableEmpty() { + return offsetAddressableEmpty; + } + + public boolean isOffsetValueEmpty() { + return offsetValueEmpty; + } + } + + @BeforeEach + public void before() { + Properties props = getDefaultProperties(); + + for (int i = 0; i < 10; i++) { + producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props); + producer.send(new ProducerRecord<>(TOPIC, String.valueOf(i))); + } + } + + @Override + protected void doPreSetup() throws Exception { + super.doPreSetup(); + + messagesLatch = new CountDownLatch(1); + resumeStrategy = new TestKafkaConsumerResumeStrategy(messagesLatch); + } + + @Test + // @Timeout(value = 30) + public void testOffsetIsBeingChecked() throws InterruptedException { + assertTrue(messagesLatch.await(100, TimeUnit.SECONDS), "The resume was not called"); + + assertTrue(resumeStrategy.isResumeCalled(), + "The resume strategy should have been called when the partition was assigned"); + assertFalse(resumeStrategy.isConsumerIsNull(), + "The consumer passed to the strategy should not be null"); + assertTrue(resumeStrategy.isStartCalled(), + "The resume strategy should have been started"); + assertFalse(resumeStrategy.isOffsetNull(), + "The offset should not be null"); + assertFalse(resumeStrategy.isOffsetAddressableNull(), + "The offset addressable should not be null"); + assertFalse(resumeStrategy.isOffsetAddressableEmpty(), + "The offset addressable should not be empty"); + assertFalse(resumeStrategy.isOffsetValueNull(), + "The offset value should not be null"); + assertFalse(resumeStrategy.isOffsetValueEmpty(), + "The offset value should not be empty"); + assertEquals(RANDOM_VALUE, resumeStrategy.lastOffset, "the offsets don't match"); + } + + @AfterEach + public void after() { + kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("kafka:" + TOPIC + "?groupId=" + TOPIC + "_GROUP&autoCommitIntervalMs=1000" + + "&autoOffsetReset=earliest&consumersCount=1") + .routeId("resume-strategy-route") + .setHeader("CamelOffset", + constant(Resumables.of("key", RANDOM_VALUE))) + .resumable().header("CamelOffset").resumableStrategyRef("resumeStrategy") + .to("mock:result"); + } + }; + } +} diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java index c9f9b95..fb8bae5 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java @@ -78,6 +78,11 @@ public class KafkaConsumerWithResumeStrategyIT extends BaseEmbeddedKafkaTestSupp public boolean isConsumerIsNull() { return consumerIsNull; } + + @Override + public void start() throws Exception { + + } } @Override diff --git a/components/camel-kafka/src/test/resources/log4j2.properties b/components/camel-kafka/src/test/resources/log4j2.properties index 1c24e24..72b19cf 100644 --- a/components/camel-kafka/src/test/resources/log4j2.properties +++ b/components/camel-kafka/src/test/resources/log4j2.properties @@ -32,11 +32,14 @@ logger.camel.name=org.apache.camel logger.camel.level=INFO logger.camelKafka.name=org.apache.camel.component.kafka -logger.camelKafka.level=INFO +logger.camelKafka.level=DEBUG logger.idem.name=org.apache.camel.processor.idempotent logger.idem.level=INFO +logger.resume.name=org.apache.camel.processor.resume.kafka +logger.resume.level=INFO + logger.kafka.name=org.apache.kafka logger.kafka.level=INFO diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/ResumeAware.java similarity index 73% copy from components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java copy to core/camel-api/src/main/java/org/apache/camel/ResumeAware.java index b03b31d..a04168f 100644 --- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java +++ b/core/camel-api/src/main/java/org/apache/camel/ResumeAware.java @@ -15,13 +15,9 @@ * limitations under the License. */ -package org.apache.camel.component.couchdb.consumer; +package org.apache.camel; -import org.apache.camel.ResumeStrategy; +public interface ResumeAware<T extends ResumeStrategy> { -/** - * Defines a resumable strategy usable by the CouchDB component - */ -public interface CouchDbResumeStrategy extends ResumeStrategy<CouchDbResumable> { - void setResumable(CouchDbResumable resumable); + void setResumeStrategy(T resumeStrategy); } diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java b/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java index e8c7f28..a271cd4 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java +++ b/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java @@ -21,7 +21,7 @@ package org.apache.camel; * Defines a strategy for handling resume operations. Implementations can define different ways to handle how to resume * processing records. */ -public interface ResumeStrategy<T> { +public interface ResumeStrategy { /** * A consumer, iterator or value class that can be used to set the index position from which to resume from. The @@ -29,4 +29,11 @@ public interface ResumeStrategy<T> { * */ void resume(); + + /** + * Starts the resume strategy + * + * @throws Exception + */ + void start() throws Exception; } diff --git a/core/camel-api/src/main/java/org/apache/camel/Route.java b/core/camel-api/src/main/java/org/apache/camel/Route.java index a575690..2d4e0cd 100644 --- a/core/camel-api/src/main/java/org/apache/camel/Route.java +++ b/core/camel-api/src/main/java/org/apache/camel/Route.java @@ -373,4 +373,9 @@ public interface Route extends RuntimeConfiguration { */ void addErrorHandlerFactoryReference(ErrorHandlerFactory source, ErrorHandlerFactory target); + /** + * Sets the resume strategy for the route + */ + void setResumeStrategy(ResumeStrategy resumeStrategy); + } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java index 49f2a62..c102175 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java @@ -33,6 +33,8 @@ import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.NamedNode; import org.apache.camel.Navigate; import org.apache.camel.Processor; +import org.apache.camel.ResumeAware; +import org.apache.camel.ResumeStrategy; import org.apache.camel.Route; import org.apache.camel.RouteAware; import org.apache.camel.Service; @@ -87,6 +89,7 @@ public class DefaultRoute extends ServiceSupport implements Route { private ShutdownRunningTask shutdownRunningTask; private final Map<String, Processor> onCompletions = new HashMap<>(); private final Map<String, Processor> onExceptions = new HashMap<>(); + private ResumeStrategy resumeStrategy; // camel-core-model private ErrorHandlerFactory errorHandlerFactory; @@ -626,6 +629,10 @@ public class DefaultRoute extends ServiceSupport implements Route { if (consumer instanceof RouteIdAware) { ((RouteIdAware) consumer).setRouteId(this.getId()); } + + if (consumer instanceof ResumeAware) { + ((ResumeAware) consumer).setResumeStrategy(resumeStrategy); + } } if (processor instanceof Service) { services.add((Service) processor); @@ -699,4 +706,8 @@ public class DefaultRoute extends ServiceSupport implements Route { return consumer instanceof Suspendable && consumer instanceof SuspendableService; } + @Override + public void setResumeStrategy(ResumeStrategy resumeStrategy) { + this.resumeStrategy = resumeStrategy; + } } diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java index a5831df..cb0c74b 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -3774,6 +3774,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> return (Type) this; } + /** + * This defines the route as resumable, which allows the route to work with the endpoints and components to manage + * the state of consumers and resume upon restart + * + * @return The expression to create the Resumable + */ + public ExpressionClause<ResumableDefinition> resumable() { + ResumableDefinition answer = new ResumableDefinition(); + + addOutput(answer); + return createAndSetExpression(answer); + } + // Properties // ------------------------------------------------------------------------- diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java new file mode 100644 index 0000000..3a92d32 --- /dev/null +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.model; + +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlTransient; + +import org.apache.camel.ResumeStrategy; + +public class ResumableDefinition extends OutputExpressionNode { + @XmlAttribute(required = true) + private String resumeStrategyRef; + + @XmlTransient + private ResumeStrategy resumeStrategy; + + @Override + public String getShortName() { + return "resumable"; + } + + public String getResumeStrategyRef() { + return resumeStrategyRef; + } + + public void setResumeStrategyRef(String resumeStrategyRef) { + this.resumeStrategyRef = resumeStrategyRef; + } + + public ResumeStrategy getResumeStrategy() { + return resumeStrategy; + } + + public void setResumeStrategy(ResumeStrategy resumeStrategy) { + this.resumeStrategy = resumeStrategy; + } + + // Fluent API + // ------------------------------------------------------------------------- + public ResumableDefinition resumableStrategyRef(String resumeStrategyRef) { + setResumeStrategyRef(resumeStrategyRef); + + return this; + } +} diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java new file mode 100644 index 0000000..62dba42 --- /dev/null +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.processor.resume; + +import org.apache.camel.Exchange; +import org.apache.camel.Resumable; +import org.apache.camel.ResumeStrategy; +import org.apache.camel.UpdatableConsumerResumeStrategy; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.support.ExchangeHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ResumableCompletion implements Synchronization { + private static final Logger LOG = LoggerFactory.getLogger(ResumableCompletion.class); + + private final ResumeStrategy resumeStrategy; + private final Resumable<?, ?> resumable; + + public ResumableCompletion(ResumeStrategy resumeStrategy, Resumable<?, ?> resumable) { + this.resumeStrategy = resumeStrategy; + this.resumable = resumable; + } + + @Override + public void onComplete(Exchange exchange) { + if (!ExchangeHelper.isFailureHandled(exchange)) { + if (resumeStrategy instanceof UpdatableConsumerResumeStrategy) { + UpdatableConsumerResumeStrategy updatableConsumerResumeStrategy + = (UpdatableConsumerResumeStrategy) resumeStrategy; + try { + updatableConsumerResumeStrategy.updateLastOffset(resumable); + } catch (Exception e) { + LOG.error("Unable to update the offset: {}", e.getMessage(), e); + } + } else { + LOG.warn("Cannot perform an offset update because the strategy is not updatable"); + } + } + } + + @Override + public void onFailure(Exchange exchange) { + LOG.warn("Skipping offset update for {} due to failure in processing", resumable.getAddressable()); + } +} diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java new file mode 100644 index 0000000..35cf8af --- /dev/null +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.processor.resume; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Exchange; +import org.apache.camel.Expression; +import org.apache.camel.Navigate; +import org.apache.camel.Processor; +import org.apache.camel.Resumable; +import org.apache.camel.ResumeStrategy; +import org.apache.camel.spi.IdAware; +import org.apache.camel.spi.RouteIdAware; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.support.AsyncProcessorConverterHelper; +import org.apache.camel.support.AsyncProcessorSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ResumableProcessor extends AsyncProcessorSupport + implements Navigate<Processor>, CamelContextAware, IdAware, RouteIdAware { + private static final Logger LOG = LoggerFactory.getLogger(ResumableProcessor.class); + private CamelContext camelContext; + private ResumeStrategy resumeStrategy; + private AsyncProcessor processor; + private final Expression offsetExpression; + private String id; + private String routeId; + + private static class ResumableProcessorCallback implements AsyncCallback { + + private final Exchange exchange; + private final Synchronization completion; + private final AsyncCallback callback; + + public ResumableProcessorCallback(Exchange exchange, Synchronization completion, AsyncCallback callback) { + this.exchange = exchange; + this.completion = completion; + this.callback = callback; + } + + @Override + public void done(boolean doneSync) { + try { + if (exchange.isFailed()) { + completion.onFailure(exchange); + } else { + completion.onComplete(exchange); + } + } finally { + callback.done(doneSync); + } + } + } + + public ResumableProcessor(Expression offsetExpression, ResumeStrategy resumeStrategy, Processor processor) { + this.resumeStrategy = Objects.requireNonNull(resumeStrategy); + this.processor = AsyncProcessorConverterHelper.convert(processor); + this.offsetExpression = offsetExpression; + + LOG.info("Enabling the resumable strategy of type: {}", resumeStrategy.getClass().getSimpleName()); + } + + @Override + public boolean process(final Exchange exchange, final AsyncCallback callback) { + Object offset = exchange.getMessage().getHeader("CamelOffset"); + + if (offset instanceof Resumable) { + Resumable<?, ?> resumable = (Resumable<?, ?>) offset; + + LOG.warn("Processing the resumable: {}", resumable.getAddressable()); + LOG.warn("Processing the resumable of type: {}", resumable.getLastOffset().offset()); + + final Synchronization onCompletion = new ResumableCompletion(resumeStrategy, resumable); + final AsyncCallback target = new ResumableProcessorCallback(exchange, onCompletion, callback); + return processor.process(exchange, target); + + } else { + LOG.warn("Cannot update the last offset because it's not available"); + } + + return processor.process(exchange, callback); + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public String getId() { + return id; + } + + @Override + public void setId(String id) { + this.id = id; + } + + @Override + public String getRouteId() { + return routeId; + } + + @Override + public void setRouteId(String routeId) { + this.routeId = routeId; + } + + @Override + public List<Processor> next() { + if (!hasNext()) { + return null; + } + List<Processor> answer = new ArrayList<>(1); + answer.add(processor); + return answer; + } + + @Override + public boolean hasNext() { + return processor != null; + } +} diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java index 2ee8bbd..42695ca 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java @@ -78,6 +78,7 @@ import org.apache.camel.model.RemoveHeadersDefinition; import org.apache.camel.model.RemovePropertiesDefinition; import org.apache.camel.model.RemovePropertyDefinition; import org.apache.camel.model.ResequenceDefinition; +import org.apache.camel.model.ResumableDefinition; import org.apache.camel.model.RollbackDefinition; import org.apache.camel.model.RouteDefinition; import org.apache.camel.model.RouteDefinitionHelper; @@ -313,6 +314,8 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends return new WhenSkipSendToEndpointReifier(route, definition); } else if (definition instanceof WhenDefinition) { return new WhenReifier(route, definition); + } else if (definition instanceof ResumableDefinition) { + return new ResumableReifier(route, definition); } return null; } diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java new file mode 100644 index 0000000..d6f683e --- /dev/null +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.reifier; + +import org.apache.camel.Expression; +import org.apache.camel.Processor; +import org.apache.camel.ResumeStrategy; +import org.apache.camel.Route; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.model.ResumableDefinition; +import org.apache.camel.processor.resume.ResumableProcessor; +import org.apache.camel.util.ObjectHelper; + +public class ResumableReifier extends ExpressionReifier<ResumableDefinition> { + protected ResumableReifier(Route route, ProcessorDefinition<?> definition) { + super(route, ResumableDefinition.class.cast(definition)); + + } + + @Override + public Processor createProcessor() throws Exception { + Processor childProcessor = createChildProcessor(false); + + ResumeStrategy resumeStrategy = resolveResumeStrategy(); + ObjectHelper.notNull(resumeStrategy, "resumeStrategy", definition); + route.setResumeStrategy(resumeStrategy); + + Expression expression = createExpression(definition.getExpression()); + + return new ResumableProcessor(expression, resumeStrategy, childProcessor); + } + + protected ResumeStrategy resolveResumeStrategy() { + String ref = parseString(definition.getResumeStrategyRef()); + + if (ref != null) { + definition.setResumeStrategy(mandatoryLookup(ref, ResumeStrategy.class)); + } + + return definition.getResumeStrategy(); + } +} diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java index 62c4560..f12941d 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java @@ -44,6 +44,11 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport public void resume() { // NO-OP } + + @Override + public void start() throws Exception { + + } } @DisplayName("Tests whether we can resume from an offset") diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java index 129399c..d70134e 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java @@ -47,6 +47,11 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport { resumeSet.resumeEach(f -> !processedFiles.contains(f.getName())); } } + + @Override + public void start() throws Exception { + + } } @Test
