This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9aab52d4d37c20a254702527428f67e9f1a7b46b
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Wed Feb 2 17:00:49 2022 +0100

    CAMEL-15562: preliminary implementation of the user-facing resume API
---
 .../apache/camel/catalog/schemas/camel-spring.xsd  | 101 +++++++++
 .../kinesis/consumer/KinesisResumeStrategy.java    |   2 +-
 .../KinesisUserConfigurationResumeStrategy.java    |   5 +
 .../couchdb/consumer/CouchDbResumeStrategy.java    |   2 +-
 .../LatestUpdateSequenceResumeStrategy.java        |   5 +
 .../camel/component/kafka/KafkaFetchRecords.java   |   6 +-
 .../support/KafkaConsumerResumeStrategy.java       |   2 +-
 .../support/OffsetKafkaConsumerResumeStrategy.java |   5 +
 .../support/PartitionAssignmentListener.java       |  13 +-
 .../consumer/support/ResumeStrategyFactory.java    |   5 +
 .../SeekPolicyKafkaConsumerResumeStrategy.java     |   5 +
 .../resume/kafka/AbstractKafkaResumeStrategy.java  |  58 ++++-
 .../KafkaConsumerWithResumeRouteStrategyIT.java    | 239 +++++++++++++++++++++
 .../KafkaConsumerWithResumeStrategyIT.java         |   5 +
 .../src/test/resources/log4j2.properties           |   5 +-
 .../main/java/org/apache/camel/ResumeAware.java    |  10 +-
 .../main/java/org/apache/camel/ResumeStrategy.java |   9 +-
 .../src/main/java/org/apache/camel/Route.java      |   5 +
 .../org/apache/camel/impl/engine/DefaultRoute.java |  11 +
 .../apache/camel/model/ProcessorDefinition.java    |  13 ++
 .../apache/camel/model/ResumableDefinition.java    |  60 ++++++
 .../processor/resume/ResumableCompletion.java      |  61 ++++++
 .../camel/processor/resume/ResumableProcessor.java | 151 +++++++++++++
 .../org/apache/camel/reifier/ProcessorReifier.java |   3 +
 .../org/apache/camel/reifier/ResumableReifier.java |  57 +++++
 .../FileConsumerResumeFromOffsetStrategyTest.java  |   5 +
 .../file/FileConsumerResumeStrategyTest.java       |   5 +
 27 files changed, 820 insertions(+), 28 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
index f397503..2316c1d 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
@@ -10510,6 +10510,107 @@ Reference to the rest-dsl.
     </xs:attribute>
   </xs:complexType>
 
+  <xs:complexType name="resumableDefinition">
+    <xs:complexContent>
+      <xs:extension base="tns:processorDefinition">
+        <xs:sequence>
+          <xs:choice>
+            <xs:element ref="tns:expressionDefinition"/>
+            <xs:element ref="tns:csimple"/>
+            <xs:element ref="tns:constant"/>
+            <xs:element ref="tns:datasonnet"/>
+            <xs:element ref="tns:exchangeProperty"/>
+            <xs:element ref="tns:groovy"/>
+            <xs:element ref="tns:header"/>
+            <xs:element ref="tns:hl7terser"/>
+            <xs:element ref="tns:joor"/>
+            <xs:element ref="tns:jsonpath"/>
+            <xs:element ref="tns:language"/>
+            <xs:element ref="tns:method"/>
+            <xs:element ref="tns:mvel"/>
+            <xs:element ref="tns:ognl"/>
+            <xs:element ref="tns:ref"/>
+            <xs:element ref="tns:simple"/>
+            <xs:element ref="tns:spel"/>
+            <xs:element ref="tns:tokenize"/>
+            <xs:element ref="tns:xtokenize"/>
+            <xs:element ref="tns:xpath"/>
+            <xs:element ref="tns:xquery"/>
+          </xs:choice>
+          <xs:choice maxOccurs="unbounded" minOccurs="0">
+            <xs:element ref="tns:aggregate"/>
+            <xs:element ref="tns:bean"/>
+            <xs:element ref="tns:doCatch"/>
+            <xs:element ref="tns:when"/>
+            <xs:element ref="tns:choice"/>
+            <xs:element ref="tns:otherwise"/>
+            <xs:element ref="tns:circuitBreaker"/>
+            <xs:element ref="tns:claimCheck"/>
+            <xs:element ref="tns:convertBodyTo"/>
+            <xs:element ref="tns:delay"/>
+            <xs:element ref="tns:dynamicRouter"/>
+            <xs:element ref="tns:enrich"/>
+            <xs:element ref="tns:filter"/>
+            <xs:element ref="tns:doFinally"/>
+            <xs:element ref="tns:idempotentConsumer"/>
+            <xs:element ref="tns:inOnly"/>
+            <xs:element ref="tns:inOut"/>
+            <xs:element ref="tns:intercept"/>
+            <xs:element ref="tns:interceptFrom"/>
+            <xs:element ref="tns:interceptSendToEndpoint"/>
+            <xs:element ref="tns:kamelet"/>
+            <xs:element ref="tns:loadBalance"/>
+            <xs:element ref="tns:log"/>
+            <xs:element ref="tns:loop"/>
+            <xs:element ref="tns:marshal"/>
+            <xs:element ref="tns:multicast"/>
+            <xs:element ref="tns:onCompletion"/>
+            <xs:element ref="tns:onException"/>
+            <xs:element ref="tns:onFallback"/>
+            <xs:element ref="tns:pipeline"/>
+            <xs:element ref="tns:policy"/>
+            <xs:element ref="tns:pollEnrich"/>
+            <xs:element ref="tns:process"/>
+            <xs:element ref="tns:recipientList"/>
+            <xs:element ref="tns:removeHeader"/>
+            <xs:element ref="tns:removeHeaders"/>
+            <xs:element ref="tns:removeProperties"/>
+            <xs:element ref="tns:removeProperty"/>
+            <xs:element ref="tns:resequence"/>
+            <xs:element ref="tns:rollback"/>
+            <xs:element ref="tns:route"/>
+            <xs:element ref="tns:routingSlip"/>
+            <xs:element ref="tns:saga"/>
+            <xs:element ref="tns:sample"/>
+            <xs:element ref="tns:script"/>
+            <xs:element ref="tns:setBody"/>
+            <xs:element ref="tns:setExchangePattern"/>
+            <xs:element ref="tns:setHeader"/>
+            <xs:element ref="tns:setProperty"/>
+            <xs:element ref="tns:sort"/>
+            <xs:element ref="tns:split"/>
+            <xs:element ref="tns:step"/>
+            <xs:element ref="tns:stop"/>
+            <xs:element ref="tns:doSwitch"/>
+            <xs:element ref="tns:threads"/>
+            <xs:element ref="tns:throttle"/>
+            <xs:element ref="tns:throwException"/>
+            <xs:element ref="tns:to"/>
+            <xs:element ref="tns:toD"/>
+            <xs:element ref="tns:transacted"/>
+            <xs:element ref="tns:transform"/>
+            <xs:element ref="tns:doTry"/>
+            <xs:element ref="tns:unmarshal"/>
+            <xs:element ref="tns:validate"/>
+            <xs:element ref="tns:wireTap"/>
+            <xs:element ref="tns:serviceCall"/>
+          </xs:choice>
+        </xs:sequence>
+        <xs:attribute name="resumeStrategyRef" type="xs:string" 
use="required"/>
+      </xs:extension>
+    </xs:complexContent>
+  </xs:complexType>
+
   <xs:complexType name="rollbackDefinition">
     <xs:complexContent>
       <xs:extension base="tns:noOutputDefinition">
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java
index b6a000d..6c3ae6f 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java
@@ -20,7 +20,7 @@ package org.apache.camel.component.aws2.kinesis.consumer;
 import org.apache.camel.ResumeStrategy;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 
-public interface KinesisResumeStrategy extends 
ResumeStrategy<GetShardIteratorRequest.Builder> {
+public interface KinesisResumeStrategy extends ResumeStrategy {
 
     void setRequestBuilder(GetShardIteratorRequest.Builder builder);
 }
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java
index 5633443..e5c92fc 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java
@@ -46,4 +46,9 @@ public class KinesisUserConfigurationResumeStrategy 
implements KinesisResumeStra
             
resumable.startingSequenceNumber(configuration.getSequenceNumber());
         }
     }
+
+    @Override
+    public void start() throws Exception {
+
+    }
 }
diff --git 
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java
 
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java
index b03b31d..14d87d4 100644
--- 
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java
+++ 
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java
@@ -22,6 +22,6 @@ import org.apache.camel.ResumeStrategy;
 /**
  * Defines a resumable strategy usable by the CouchDB component
  */
-public interface CouchDbResumeStrategy extends 
ResumeStrategy<CouchDbResumable> {
+public interface CouchDbResumeStrategy extends ResumeStrategy {
     void setResumable(CouchDbResumable resumable);
 }
diff --git 
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeStrategy.java
 
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeStrategy.java
index 277d842..04679d8 100644
--- 
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeStrategy.java
+++ 
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/LatestUpdateSequenceResumeStrategy.java
@@ -36,4 +36,9 @@ public final class LatestUpdateSequenceResumeStrategy 
implements CouchDbResumeSt
 
         resumable.updateLastOffset(clientWrapper.getLatestUpdateSequence());
     }
+
+    @Override
+    public void start() throws Exception {
+
+    }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index ed49fdf..deeef3c 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -28,6 +28,7 @@ import java.util.regex.Pattern;
 
 import org.apache.camel.component.kafka.consumer.CommitManager;
 import org.apache.camel.component.kafka.consumer.CommitManagers;
+import 
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
 import 
org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade;
 import 
org.apache.camel.component.kafka.consumer.support.PartitionAssignmentListener;
 import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
@@ -136,9 +137,12 @@ class KafkaFetchRecords implements Runnable {
     }
 
     private void subscribe() {
+        KafkaConsumerResumeStrategy userProvidedStrategy
+                = 
kafkaConsumer.getEndpoint().getCamelContext().hasService(KafkaConsumerResumeStrategy.class);
+
         PartitionAssignmentListener listener = new PartitionAssignmentListener(
                 threadId, kafkaConsumer.getEndpoint().getConfiguration(), 
consumer, lastProcessedOffset,
-                this::isRunnable, commitManager);
+                this::isRunnable, commitManager, userProvidedStrategy);
 
         if (LOG.isInfoEnabled()) {
             LOG.info("Subscribing {} to {}", threadId, getPrintableTopic());
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
index c433745..35eb879 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaConsumerResumeStrategy.java
@@ -27,7 +27,7 @@ import org.apache.kafka.clients.consumer.Consumer;
  * the component is set up to use more than one of them. As such, 
implementations are responsible for ensuring the
  * thread-safety of the operations within the resume method.
  */
-public interface KafkaConsumerResumeStrategy extends 
ResumeStrategy<KafkaResumable> {
+public interface KafkaConsumerResumeStrategy extends ResumeStrategy {
     void setConsumer(Consumer<?, ?> consumer);
 
     default void resume(KafkaResumable resumable) {
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
index 4502831..3875184 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/OffsetKafkaConsumerResumeStrategy.java
@@ -73,4 +73,9 @@ public class OffsetKafkaConsumerResumeStrategy implements 
KafkaConsumerResumeStr
     public void resume(KafkaResumable resumable) {
         resume();
     }
+
+    @Override
+    public void start() throws Exception {
+
+    }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
index efedbe5..fe19147 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/PartitionAssignmentListener.java
@@ -38,7 +38,6 @@ public class PartitionAssignmentListener implements 
ConsumerRebalanceListener {
 
     private final String threadId;
     private final KafkaConfiguration configuration;
-    private final Consumer consumer;
     private final Map<String, Long> lastProcessedOffset;
     private final KafkaConsumerResumeStrategy resumeStrategy;
     private final CommitManager commitManager;
@@ -46,15 +45,21 @@ public class PartitionAssignmentListener implements 
ConsumerRebalanceListener {
 
     public PartitionAssignmentListener(String threadId, KafkaConfiguration 
configuration,
                                        Consumer consumer, Map<String, Long> 
lastProcessedOffset,
-                                       Supplier<Boolean> stopStateSupplier, 
CommitManager commitManager) {
+                                       Supplier<Boolean> stopStateSupplier, 
CommitManager commitManager,
+                                       KafkaConsumerResumeStrategy 
resumeStrategy) {
         this.threadId = threadId;
         this.configuration = configuration;
-        this.consumer = consumer;
         this.lastProcessedOffset = lastProcessedOffset;
         this.commitManager = commitManager;
         this.stopStateSupplier = stopStateSupplier;
 
-        this.resumeStrategy = 
ResumeStrategyFactory.newResumeStrategy(configuration);
+        if (resumeStrategy == null) {
+            LOG.info("No resume strategy was provided ... checking for 
builtins ...");
+            this.resumeStrategy = 
ResumeStrategyFactory.newResumeStrategy(configuration);
+        } else {
+            LOG.info("Using user-provided strategy");
+            this.resumeStrategy = resumeStrategy;
+        }
         resumeStrategy.setConsumer(consumer);
     }
 
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
index 3465ef5..67707de 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java
@@ -40,6 +40,11 @@ public final class ResumeStrategyFactory {
         public void resume() {
 
         }
+
+        @Override
+        public void start() throws Exception {
+
+        }
     }
 
     private static final NoOpKafkaConsumerResumeStrategy NO_OP_RESUME_STRATEGY 
= new NoOpKafkaConsumerResumeStrategy();
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
index 91db91b..46fd974 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/SeekPolicyKafkaConsumerResumeStrategy.java
@@ -58,4 +58,9 @@ public class SeekPolicyKafkaConsumerResumeStrategy implements 
KafkaConsumerResum
     public void resume(KafkaResumable resumable) {
         resume();
     }
+
+    @Override
+    public void start() throws Exception {
+
+    }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
index c8bd037..4fad434 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/AbstractKafkaResumeStrategy.java
@@ -17,6 +17,7 @@
 
 package org.apache.camel.processor.resume.kafka;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -30,6 +31,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import org.apache.camel.Resumable;
+import org.apache.camel.Service;
 import org.apache.camel.UpdatableConsumerResumeStrategy;
 import org.apache.camel.util.StringHelper;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -49,7 +51,9 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public abstract class AbstractKafkaResumeStrategy<K, V> implements 
UpdatableConsumerResumeStrategy<K, V, Resumable<K, V>> {
+public abstract class AbstractKafkaResumeStrategy<K, V>
+        implements UpdatableConsumerResumeStrategy<K, V, Resumable<K, V>>, 
Service {
+    public static final int UNLIMITED = -1;
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractKafkaResumeStrategy.class);
 
     private final String topic;
@@ -71,20 +75,13 @@ public abstract class AbstractKafkaResumeStrategy<K, V> 
implements UpdatableCons
         this.consumerConfig = createConsumer(bootstrapServers);
     }
 
-    public AbstractKafkaResumeStrategy(String bootstrapServers, String topic, 
Properties producerConfig,
+    public AbstractKafkaResumeStrategy(String topic, Properties producerConfig,
                                        Properties consumerConfig) {
         this.topic = topic;
         this.producerConfig = producerConfig;
         this.consumerConfig = consumerConfig;
     }
 
-    public void start() throws Exception {
-        consumer = new KafkaConsumer<>(consumerConfig);
-        producer = new KafkaProducer<>(producerConfig);
-
-        loadProcessedItems(processedItems);
-    }
-
     private Properties createProducer(String bootstrapServers) {
         Properties config = new Properties();
 
@@ -157,7 +154,7 @@ public abstract class AbstractKafkaResumeStrategy<K, V> 
implements UpdatableCons
     }
 
     protected void loadProcessedItems(Map<K, List<V>> processed) throws 
Exception {
-        loadProcessedItems(processed, -1);
+        loadProcessedItems(processed, UNLIMITED);
     }
 
     protected void loadProcessedItems(Map<K, List<V>> processed, int limit) 
throws Exception {
@@ -181,7 +178,7 @@ public abstract class AbstractKafkaResumeStrategy<K, V> 
implements UpdatableCons
                 var entries = processed.computeIfAbsent(record.key(), k -> new 
ArrayList<>());
                 entries.add(record.value());
 
-                if (processed.size() >= limit) {
+                if (limit != UNLIMITED && processed.size() >= limit) {
                     break;
                 }
             }
@@ -278,4 +275,43 @@ public abstract class AbstractKafkaResumeStrategy<K, V> 
implements UpdatableCons
         return Collections.unmodifiableList(sentItems);
     }
 
+    @Override
+    public void build() {
+        Service.super.build();
+    }
+
+    @Override
+    public void init() {
+        Service.super.init();
+
+        LOG.debug("Initializing the Kafka resume strategy");
+        if (consumer == null) {
+            consumer = new KafkaConsumer<>(consumerConfig);
+        }
+
+        if (producer == null) {
+            producer = new KafkaProducer<>(producerConfig);
+        }
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+        Service.super.close();
+    }
+
+    @Override
+    public void start() {
+        LOG.info("Starting the kafka resume strategy");
+
+        try {
+            loadProcessedItems(processedItems);
+        } catch (Exception e) {
+            LOG.error("Failed to load already processed items: {}", 
e.getMessage(), e);
+        }
+    }
 }
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
new file mode 100644
index 0000000..5b0e828
--- /dev/null
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Offset;
+import org.apache.camel.Resumable;
+import org.apache.camel.Service;
+import org.apache.camel.UpdatableConsumerResumeStrategy;
+import org.apache.camel.builder.RouteBuilder;
+import 
org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
+import org.apache.camel.component.kafka.consumer.support.KafkaResumable;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.resume.Resumables;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KafkaConsumerWithResumeRouteStrategyIT extends 
BaseEmbeddedKafkaTestSupport {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaConsumerWithResumeRouteStrategyIT.class);
+
+    private static final String TOPIC = "resumable-route-tp";
+
+    @EndpointInject("mock:result")
+    private MockEndpoint result;
+
+    @BindToRegistry("resumeStrategy")
+    private TestKafkaConsumerResumeStrategy resumeStrategy;
+    private CountDownLatch messagesLatch;
+    private static final int RANDOM_VALUE = 
ThreadLocalRandom.current().nextInt(1, 1000);
+    private KafkaProducer<Object, Object> producer;
+
+    private static class TestKafkaConsumerResumeStrategy
+            implements KafkaConsumerResumeStrategy,
+            UpdatableConsumerResumeStrategy<String, Integer, Resumable<String, 
Integer>>, Service {
+        private final CountDownLatch messagesLatch;
+        private boolean resumeCalled;
+        private boolean consumerIsNull = true;
+        private boolean startCalled;
+        private boolean offsetNull = true;
+        private boolean offsetAddressableNull = true;
+        private boolean offsetAddressableEmpty = true;
+        private boolean offsetValueNull = true;
+        private boolean offsetValueEmpty = true;
+        private int lastOffset;
+
+        public TestKafkaConsumerResumeStrategy(CountDownLatch messagesLatch) {
+            this.messagesLatch = messagesLatch;
+        }
+
+        @Override
+        public void setConsumer(Consumer<?, ?> consumer) {
+            if (consumer != null) {
+                consumerIsNull = false;
+            }
+        }
+
+        @Override
+        public void resume(KafkaResumable resumable) {
+            resumeCalled = true;
+
+        }
+
+        @Override
+        public void resume() {
+            resumeCalled = true;
+        }
+
+        public boolean isResumeCalled() {
+            return resumeCalled;
+        }
+
+        public boolean isConsumerIsNull() {
+            return consumerIsNull;
+        }
+
+        @Override
+        public void start() {
+            LOG.warn("Start was called");
+            startCalled = true;
+        }
+
+        @Override
+        public void stop() {
+
+        }
+
+        @Override
+        public void init() {
+            Service.super.init();
+            LOG.warn("Init was called");
+        }
+
+        public boolean isStartCalled() {
+            return startCalled;
+        }
+
+        @Override
+        public void updateLastOffset(Resumable<String, Integer> offset) {
+            try {
+                if (offset != null) {
+                    offsetNull = false;
+
+                    String addressable = offset.getAddressable();
+                    if (addressable != null) {
+                        offsetAddressableNull = false;
+                        offsetAddressableEmpty = addressable.isEmpty() || 
addressable.isBlank();
+
+                    }
+
+                    Offset<Integer> offsetValue = offset.getLastOffset();
+                    if (offsetValue != null) {
+                        offsetValueNull = false;
+
+                        if (offsetValue.offset() != null) {
+                            offsetValueEmpty = false;
+                            lastOffset = offsetValue.offset();
+                        }
+                    }
+                }
+            } finally {
+                messagesLatch.countDown();
+            }
+        }
+
+        public boolean isOffsetNull() {
+            return offsetNull;
+        }
+
+        public boolean isOffsetAddressableNull() {
+            return offsetAddressableNull;
+        }
+
+        public boolean isOffsetValueNull() {
+            return offsetValueNull;
+        }
+
+        public boolean isOffsetAddressableEmpty() {
+            return offsetAddressableEmpty;
+        }
+
+        public boolean isOffsetValueEmpty() {
+            return offsetValueEmpty;
+        }
+    }
+
+    @BeforeEach
+    public void before() {
+        Properties props = getDefaultProperties();
+
+        for (int i = 0; i < 10; i++) {
+            producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
+            producer.send(new ProducerRecord<>(TOPIC, String.valueOf(i)));
+        }
+    }
+
+    @Override
+    protected void doPreSetup() throws Exception {
+        super.doPreSetup();
+
+        messagesLatch = new CountDownLatch(1);
+        resumeStrategy = new TestKafkaConsumerResumeStrategy(messagesLatch);
+    }
+
+    @Test
+    //    @Timeout(value = 30)
+    public void testOffsetIsBeingChecked() throws InterruptedException {
+        assertTrue(messagesLatch.await(100, TimeUnit.SECONDS), "The resume was 
not called");
+
+        assertTrue(resumeStrategy.isResumeCalled(),
+                "The resume strategy should have been called when the 
partition was assigned");
+        assertFalse(resumeStrategy.isConsumerIsNull(),
+                "The consumer passed to the strategy should not be null");
+        assertTrue(resumeStrategy.isStartCalled(),
+                "The resume strategy should have been started");
+        assertFalse(resumeStrategy.isOffsetNull(),
+                "The offset should not be null");
+        assertFalse(resumeStrategy.isOffsetAddressableNull(),
+                "The offset addressable should not be null");
+        assertFalse(resumeStrategy.isOffsetAddressableEmpty(),
+                "The offset addressable should not be empty");
+        assertFalse(resumeStrategy.isOffsetValueNull(),
+                "The offset value should not be null");
+        assertFalse(resumeStrategy.isOffsetValueEmpty(),
+                "The offset value should not be empty");
+        assertEquals(RANDOM_VALUE, resumeStrategy.lastOffset, "the offsets 
don't match");
+    }
+
+    @AfterEach
+    public void after() {
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("kafka:" + TOPIC + "?groupId=" + TOPIC + 
"_GROUP&autoCommitIntervalMs=1000"
+                     + "&autoOffsetReset=earliest&consumersCount=1")
+                             .routeId("resume-strategy-route")
+                             .setHeader("CamelOffset",
+                                     constant(Resumables.of("key", 
RANDOM_VALUE)))
+                             
.resumable().header("CamelOffset").resumableStrategyRef("resumeStrategy")
+                             .to("mock:result");
+            }
+        };
+    }
+}
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java
index c9f9b95..fb8bae5 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java
@@ -78,6 +78,11 @@ public class KafkaConsumerWithResumeStrategyIT extends 
BaseEmbeddedKafkaTestSupp
         public boolean isConsumerIsNull() {
             return consumerIsNull;
         }
+
+        @Override
+        public void start() throws Exception {
+
+        }
     }
 
     @Override
diff --git a/components/camel-kafka/src/test/resources/log4j2.properties 
b/components/camel-kafka/src/test/resources/log4j2.properties
index 1c24e24..72b19cf 100644
--- a/components/camel-kafka/src/test/resources/log4j2.properties
+++ b/components/camel-kafka/src/test/resources/log4j2.properties
@@ -32,11 +32,14 @@ logger.camel.name=org.apache.camel
 logger.camel.level=INFO
 
 logger.camelKafka.name=org.apache.camel.component.kafka
-logger.camelKafka.level=INFO
+logger.camelKafka.level=DEBUG
 
 logger.idem.name=org.apache.camel.processor.idempotent
 logger.idem.level=INFO
 
+logger.resume.name=org.apache.camel.processor.resume.kafka
+logger.resume.level=INFO
+
 logger.kafka.name=org.apache.kafka
 logger.kafka.level=INFO
 
diff --git 
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java
 b/core/camel-api/src/main/java/org/apache/camel/ResumeAware.java
similarity index 73%
copy from 
components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java
copy to core/camel-api/src/main/java/org/apache/camel/ResumeAware.java
index b03b31d..a04168f 100644
--- 
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ResumeAware.java
@@ -15,13 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.camel.component.couchdb.consumer;
+package org.apache.camel;
 
-import org.apache.camel.ResumeStrategy;
+public interface ResumeAware<T extends ResumeStrategy> {
 
-/**
- * Defines a resumable strategy usable by the CouchDB component
- */
-public interface CouchDbResumeStrategy extends 
ResumeStrategy<CouchDbResumable> {
-    void setResumable(CouchDbResumable resumable);
+    void setResumeStrategy(T resumeStrategy);
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java 
b/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
index e8c7f28..a271cd4 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ResumeStrategy.java
@@ -21,7 +21,7 @@ package org.apache.camel;
  * Defines a strategy for handling resume operations. Implementations can 
define different ways to handle how to resume
  * processing records.
  */
-public interface ResumeStrategy<T> {
+public interface ResumeStrategy {
 
     /**
      * A consumer, iterator or value class that can be used to set the index 
position from which to resume from. The
@@ -29,4 +29,11 @@ public interface ResumeStrategy<T> {
      *
      */
     void resume();
+
+    /**
+     * Starts the resume strategy
+     * 
+     * @throws Exception
+     */
+    void start() throws Exception;
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/Route.java 
b/core/camel-api/src/main/java/org/apache/camel/Route.java
index a575690..2d4e0cd 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Route.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Route.java
@@ -373,4 +373,9 @@ public interface Route extends RuntimeConfiguration {
      */
     void addErrorHandlerFactoryReference(ErrorHandlerFactory source, 
ErrorHandlerFactory target);
 
+    /**
+     * Sets the resume strategy for the route
+     */
+    void setResumeStrategy(ResumeStrategy resumeStrategy);
+
 }
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
index 49f2a62..c102175 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
@@ -33,6 +33,8 @@ import org.apache.camel.ErrorHandlerFactory;
 import org.apache.camel.NamedNode;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
+import org.apache.camel.ResumeAware;
+import org.apache.camel.ResumeStrategy;
 import org.apache.camel.Route;
 import org.apache.camel.RouteAware;
 import org.apache.camel.Service;
@@ -87,6 +89,7 @@ public class DefaultRoute extends ServiceSupport implements 
Route {
     private ShutdownRunningTask shutdownRunningTask;
     private final Map<String, Processor> onCompletions = new HashMap<>();
     private final Map<String, Processor> onExceptions = new HashMap<>();
+    private ResumeStrategy resumeStrategy;
 
     // camel-core-model
     private ErrorHandlerFactory errorHandlerFactory;
@@ -626,6 +629,10 @@ public class DefaultRoute extends ServiceSupport 
implements Route {
             if (consumer instanceof RouteIdAware) {
                 ((RouteIdAware) consumer).setRouteId(this.getId());
             }
+
+            if (consumer instanceof ResumeAware) {
+                ((ResumeAware) consumer).setResumeStrategy(resumeStrategy);
+            }
         }
         if (processor instanceof Service) {
             services.add((Service) processor);
@@ -699,4 +706,8 @@ public class DefaultRoute extends ServiceSupport implements 
Route {
         return consumer instanceof Suspendable && consumer instanceof 
SuspendableService;
     }
 
+    @Override
+    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
+        this.resumeStrategy = resumeStrategy;
+    }
 }
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index a5831df..cb0c74b 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -3774,6 +3774,19 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
         return (Type) this;
     }
 
+    /**
+     * This defines the route as resumable, which allows the route to work 
with the endpoints and components to manage
+     * the state of consumers and resume upon restart
+     * 
+     * @return The expression to create the Resumable
+     */
+    public ExpressionClause<ResumableDefinition> resumable() {
+        ResumableDefinition answer = new ResumableDefinition();
+
+        addOutput(answer);
+        return createAndSetExpression(answer);
+    }
+
     // Properties
     // 
-------------------------------------------------------------------------
 
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
new file mode 100644
index 0000000..3a92d32
--- /dev/null
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ResumableDefinition.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.model;
+
+import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.camel.ResumeStrategy;
+
+public class ResumableDefinition extends OutputExpressionNode {
+    @XmlAttribute(required = true)
+    private String resumeStrategyRef;
+
+    @XmlTransient
+    private ResumeStrategy resumeStrategy;
+
+    @Override
+    public String getShortName() {
+        return "resumable";
+    }
+
+    public String getResumeStrategyRef() {
+        return resumeStrategyRef;
+    }
+
+    public void setResumeStrategyRef(String resumeStrategyRef) {
+        this.resumeStrategyRef = resumeStrategyRef;
+    }
+
+    public ResumeStrategy getResumeStrategy() {
+        return resumeStrategy;
+    }
+
+    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
+        this.resumeStrategy = resumeStrategy;
+    }
+
+    // Fluent API
+    // 
-------------------------------------------------------------------------
+    public ResumableDefinition resumableStrategyRef(String resumeStrategyRef) {
+        setResumeStrategyRef(resumeStrategyRef);
+
+        return this;
+    }
+}
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
new file mode 100644
index 0000000..62dba42
--- /dev/null
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.resume;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Resumable;
+import org.apache.camel.ResumeStrategy;
+import org.apache.camel.UpdatableConsumerResumeStrategy;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ExchangeHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResumableCompletion implements Synchronization {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ResumableCompletion.class);
+
+    private final ResumeStrategy resumeStrategy;
+    private final Resumable<?, ?> resumable;
+
+    public ResumableCompletion(ResumeStrategy resumeStrategy, Resumable<?, ?> 
resumable) {
+        this.resumeStrategy = resumeStrategy;
+        this.resumable = resumable;
+    }
+
+    @Override
+    public void onComplete(Exchange exchange) {
+        if (!ExchangeHelper.isFailureHandled(exchange)) {
+            if (resumeStrategy instanceof UpdatableConsumerResumeStrategy) {
+                UpdatableConsumerResumeStrategy updatableConsumerResumeStrategy
+                        = (UpdatableConsumerResumeStrategy) resumeStrategy;
+                try {
+                    
updatableConsumerResumeStrategy.updateLastOffset(resumable);
+                } catch (Exception e) {
+                    LOG.error("Unable to update the offset: {}", 
e.getMessage(), e);
+                }
+            } else {
+                LOG.warn("Cannot perform an offset update because the strategy 
is not updatable");
+            }
+        }
+    }
+
+    @Override
+    public void onFailure(Exchange exchange) {
+        LOG.warn("Skipping offset update for {} due to failure in processing", 
resumable.getAddressable());
+    }
+}
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
new file mode 100644
index 0000000..35cf8af
--- /dev/null
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableProcessor.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.processor.resume;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.Navigate;
+import org.apache.camel.Processor;
+import org.apache.camel.Resumable;
+import org.apache.camel.ResumeStrategy;
+import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.AsyncProcessorConverterHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResumableProcessor extends AsyncProcessorSupport
+        implements Navigate<Processor>, CamelContextAware, IdAware, 
RouteIdAware {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ResumableProcessor.class);
+    private CamelContext camelContext;
+    private ResumeStrategy resumeStrategy;
+    private AsyncProcessor processor;
+    private final Expression offsetExpression;
+    private String id;
+    private String routeId;
+
+    private static class ResumableProcessorCallback implements AsyncCallback {
+
+        private final Exchange exchange;
+        private final Synchronization completion;
+        private final AsyncCallback callback;
+
+        public ResumableProcessorCallback(Exchange exchange, Synchronization 
completion, AsyncCallback callback) {
+            this.exchange = exchange;
+            this.completion = completion;
+            this.callback = callback;
+        }
+
+        @Override
+        public void done(boolean doneSync) {
+            try {
+                if (exchange.isFailed()) {
+                    completion.onFailure(exchange);
+                } else {
+                    completion.onComplete(exchange);
+                }
+            } finally {
+                callback.done(doneSync);
+            }
+        }
+    }
+
+    public ResumableProcessor(Expression offsetExpression, ResumeStrategy 
resumeStrategy, Processor processor) {
+        this.resumeStrategy = Objects.requireNonNull(resumeStrategy);
+        this.processor = AsyncProcessorConverterHelper.convert(processor);
+        this.offsetExpression = offsetExpression;
+
+        LOG.info("Enabling the resumable strategy of type: {}", 
resumeStrategy.getClass().getSimpleName());
+    }
+
+    @Override
+    public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
+        Object offset = exchange.getMessage().getHeader("CamelOffset");
+
+        if (offset instanceof Resumable) {
+            Resumable<?, ?> resumable = (Resumable<?, ?>) offset;
+
+            LOG.warn("Processing the resumable: {}", 
resumable.getAddressable());
+            LOG.warn("Processing the resumable of type: {}", 
resumable.getLastOffset().offset());
+
+            final Synchronization onCompletion = new 
ResumableCompletion(resumeStrategy, resumable);
+            final AsyncCallback target = new 
ResumableProcessorCallback(exchange, onCompletion, callback);
+            return processor.process(exchange, target);
+
+        } else {
+            LOG.warn("Cannot update the last offset because it's not 
available");
+        }
+
+        return processor.process(exchange, callback);
+    }
+
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    @Override
+    public String getRouteId() {
+        return routeId;
+    }
+
+    @Override
+    public void setRouteId(String routeId) {
+        this.routeId = routeId;
+    }
+
+    @Override
+    public List<Processor> next() {
+        if (!hasNext()) {
+            return null;
+        }
+        List<Processor> answer = new ArrayList<>(1);
+        answer.add(processor);
+        return answer;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return processor != null;
+    }
+}
diff --git 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
index 2ee8bbd..42695ca 100644
--- 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
+++ 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
@@ -78,6 +78,7 @@ import org.apache.camel.model.RemoveHeadersDefinition;
 import org.apache.camel.model.RemovePropertiesDefinition;
 import org.apache.camel.model.RemovePropertyDefinition;
 import org.apache.camel.model.ResequenceDefinition;
+import org.apache.camel.model.ResumableDefinition;
 import org.apache.camel.model.RollbackDefinition;
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.model.RouteDefinitionHelper;
@@ -313,6 +314,8 @@ public abstract class ProcessorReifier<T extends 
ProcessorDefinition<?>> extends
             return new WhenSkipSendToEndpointReifier(route, definition);
         } else if (definition instanceof WhenDefinition) {
             return new WhenReifier(route, definition);
+        } else if (definition instanceof ResumableDefinition) {
+            return new ResumableReifier(route, definition);
         }
         return null;
     }
diff --git 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
new file mode 100644
index 0000000..d6f683e
--- /dev/null
+++ 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ResumableReifier.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.reifier;
+
+import org.apache.camel.Expression;
+import org.apache.camel.Processor;
+import org.apache.camel.ResumeStrategy;
+import org.apache.camel.Route;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.ResumableDefinition;
+import org.apache.camel.processor.resume.ResumableProcessor;
+import org.apache.camel.util.ObjectHelper;
+
+public class ResumableReifier extends ExpressionReifier<ResumableDefinition> {
+    protected ResumableReifier(Route route, ProcessorDefinition<?> definition) 
{
+        super(route, ResumableDefinition.class.cast(definition));
+
+    }
+
+    @Override
+    public Processor createProcessor() throws Exception {
+        Processor childProcessor = createChildProcessor(false);
+
+        ResumeStrategy resumeStrategy = resolveResumeStrategy();
+        ObjectHelper.notNull(resumeStrategy, "resumeStrategy", definition);
+        route.setResumeStrategy(resumeStrategy);
+
+        Expression expression = createExpression(definition.getExpression());
+
+        return new ResumableProcessor(expression, resumeStrategy, 
childProcessor);
+    }
+
+    protected ResumeStrategy resolveResumeStrategy() {
+        String ref = parseString(definition.getResumeStrategyRef());
+
+        if (ref != null) {
+            definition.setResumeStrategy(mandatoryLookup(ref, 
ResumeStrategy.class));
+        }
+
+        return definition.getResumeStrategy();
+    }
+}
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
index 62c4560..f12941d 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
@@ -44,6 +44,11 @@ public class FileConsumerResumeFromOffsetStrategyTest 
extends ContextTestSupport
         public void resume() {
             // NO-OP
         }
+
+        @Override
+        public void start() throws Exception {
+
+        }
     }
 
     @DisplayName("Tests whether we can resume from an offset")
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
index 129399c..d70134e 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
@@ -47,6 +47,11 @@ public class FileConsumerResumeStrategyTest extends 
ContextTestSupport {
                 resumeSet.resumeEach(f -> 
!processedFiles.contains(f.getName()));
             }
         }
+
+        @Override
+        public void start() throws Exception {
+
+        }
     }
 
     @Test

Reply via email to