This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new c05c928da1b CAMEL-18127: implement adapter auto-configuration in the 
resume API
c05c928da1b is described below

commit c05c928da1b831c51c5d4617bbf81e0165b2bcf6
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Mon May 30 11:10:17 2022 +0200

    CAMEL-18127: implement adapter auto-configuration in the resume API
---
 .../org/apache/camel/resume/adapter.properties     |  18 ++
 .../resume/{single => }/CaffeineCache.java         |  61 +++++--
 .../caffeine/resume/multi/CaffeineCache.java       |  98 -----------
 .../integration/ConsumeResumeStrategyIT.java       |   2 +-
 .../couchdb/consumer/CouchDbResumable.java         |  15 +-
 .../apache/camel/component/file/FileConsumer.java  |  28 +++-
 .../apache/camel/component/file/GenericFile.java   |  10 +-
 ...ble.java => DirectoryEntriesResumeAdapter.java} |   6 +-
 ...meAdapter.java => FileOffsetResumeAdapter.java} |  18 +-
 .../component/file/consumer/FileResumeAdapter.java |   8 +-
 .../adapters/AbstractFileResumeAdapter.java}       |  37 +++--
 .../DefaultDirectoryEntriesResumeAdapter.java      |  75 +++++++++
 .../adapters/DefaultFileOffsetResumeAdapter.java   |  95 +++++++++++
 .../adapters/DefaultFileSetResumeAdapter.java      |  65 --------
 .../adapters/DefaultGenericFileResumeAdapter.java  |  64 --------
 .../DirectoryEntries.java}                         |  33 ++--
 .../file/consumer/adapters/FileOffset.java}        |  31 ++--
 .../adapters/FileResumeAdapterDelegate.java        |  92 +++++++++++
 .../FileSet.java}                                  |  47 ++++--
 .../org/apache/camel/resume/adapter.properties     |  18 ++
 .../kafka/consumer/support/KafkaResumable.java     |  15 +-
 .../resume/kafka/KafkaResumeStrategy.java          |   5 +-
 .../resume/kafka/MultiNodeKafkaResumeStrategy.java |  84 +++++-----
 .../kafka/SingleNodeKafkaResumeStrategy.java       | 159 +++++++++++-------
 .../KafkaConsumerWithResumeRouteStrategyIT.java    |  16 +-
 .../resume/{ResumeStrategy.java => Cacheable.java} |  32 ++--
 .../org/apache/camel/resume/Deserializable.java    |  56 +++++++
 .../main/java/org/apache/camel/resume/Offset.java  |  14 +-
 .../SingleEntryCache.java => OffsetKey.java}       |  29 ++--
 .../java/org/apache/camel/resume/Resumable.java    |  21 +--
 .../java/org/apache/camel/resume/ResumableSet.java |  81 ---------
 .../org/apache/camel/resume/ResumeAdapter.java     |   3 +
 .../org/apache/camel/resume/ResumeStrategy.java    |  15 ++
 .../java/org/apache/camel/resume/Serializable.java |  82 ++++++++++
 .../resume/UpdatableConsumerResumeStrategy.java    |   4 +-
 .../apache/camel/resume/cache/MultiEntryCache.java |  29 ----
 .../org/apache/camel/resume/cache/ResumeCache.java |  46 +++++-
 .../org/apache/camel/impl/engine/DefaultRoute.java |   6 +-
 .../docs/modules/eips/pages/resume-strategies.adoc |  51 +-----
 .../processor/resume/DelegatingResumeAdapter.java  | 181 ---------------------
 .../processor/resume/ResumableCompletion.java      |  14 +-
 .../processor/resume/TransientResumeStrategy.java  |  10 ++
 .../FileConsumerResumeFromOffsetStrategyTest.java  |  53 ++++--
 .../file/FileConsumerResumeStrategyTest.java       |  36 ++--
 .../apache/camel/support/resume/AdapterHelper.java | 112 +++++++++++++
 .../apache/camel/support/resume/OffsetKeys.java    | 109 +++++++++++++
 .../org/apache/camel/support/resume/Offsets.java   |  24 ++-
 .../apache/camel/support/resume/Resumables.java    |  59 ++++++-
 48 files changed, 1279 insertions(+), 888 deletions(-)

diff --git 
a/components/camel-atom/src/main/resources/org/apache/camel/resume/adapter.properties
 
b/components/camel-atom/src/main/resources/org/apache/camel/resume/adapter.properties
new file mode 100644
index 00000000000..3504fd3a39f
--- /dev/null
+++ 
b/components/camel-atom/src/main/resources/org/apache/camel/resume/adapter.properties
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+adapterClass=org.apache.camel.component.atom.UpdatedDateFilter
diff --git 
a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/single/CaffeineCache.java
 
b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/CaffeineCache.java
similarity index 58%
rename from 
components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/single/CaffeineCache.java
rename to 
components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/CaffeineCache.java
index d551418cf38..5ec66ddc88b 100644
--- 
a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/single/CaffeineCache.java
+++ 
b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/CaffeineCache.java
@@ -15,22 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.camel.component.caffeine.resume.single;
+package org.apache.camel.component.caffeine.resume;
 
 import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.function.Function;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
-import org.apache.camel.resume.cache.SingleEntryCache;
+import org.apache.camel.resume.cache.ResumeCache;
 
 /**
  * This is a simple cache implementation that uses Caffeine to store the 
resume offsets
  *
  * @param <K> The type of the key to cache
- * @param <V> The type of the value/entry to cache
  */
-public class CaffeineCache<K, V> implements SingleEntryCache<K, V> {
-    private final Cache<K, V> cache;
+public class CaffeineCache<K> implements ResumeCache<K> {
+    private final Cache<K, Object> cache;
     private final long cacheSize;
 
     /**
@@ -48,27 +49,27 @@ public class CaffeineCache<K, V> implements 
SingleEntryCache<K, V> {
      * @param cache     an instance of a pre-constructed cache object
      * @param cacheSize the size of the pre-constructed cache object
      */
-    public CaffeineCache(Cache<K, V> cache, long cacheSize) {
+    public CaffeineCache(Cache<K, Object> cache, long cacheSize) {
         this.cache = cache;
         this.cacheSize = cacheSize;
     }
 
     @Override
-    public boolean contains(K key, V entry) {
+    public boolean contains(K key, Object entry) {
         assert key != null;
-        V cachedEntry = cache.getIfPresent(key);
+        Object cachedEntry = cache.getIfPresent(key);
 
         return entry.equals(cachedEntry);
     }
 
     @Override
-    public void add(K key, V offsetValue) {
+    public void add(K key, Object offsetValue) {
         cache.put(key, offsetValue);
     }
 
     @Override
-    public Optional<V> get(K key) {
-        V entry = cache.getIfPresent(key);
+    public Object get(K key) {
+        Object entry = cache.getIfPresent(key);
 
         if (entry == null) {
             return Optional.empty();
@@ -78,12 +79,42 @@ public class CaffeineCache<K, V> implements 
SingleEntryCache<K, V> {
     }
 
     @Override
-    public boolean isFull() {
-        if (cache.estimatedSize() >= cacheSize) {
-            return true;
+    public <T> T get(K key, Class<T> clazz) {
+        Object entry = cache.getIfPresent(key);
+        if (entry != null) {
+            return clazz.cast(entry);
+        }
+
+        return null;
+    }
+
+    @Override
+    public Object computeIfAbsent(K key, Function<? super K, ? super Object> 
mapping) {
+        Object entry = cache.getIfPresent(key);
+
+        if (entry == null) {
+            entry = mapping.apply(key);
+            cache.put(key, entry);
+        }
+
+        return entry;
+    }
+
+    @Override
+    public Object computeIfPresent(K key, BiFunction<? super K, ? super 
Object, ? super Object> remapping) {
+        Object entry = cache.getIfPresent(key);
+
+        if (entry != null) {
+            entry = remapping.apply(key, entry);
+            cache.put(key, entry);
         }
 
-        return false;
+        return entry;
+    }
+
+    @Override
+    public boolean isFull() {
+        return cache.estimatedSize() >= cacheSize;
     }
 
     @Override
diff --git 
a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/multi/CaffeineCache.java
 
b/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/multi/CaffeineCache.java
deleted file mode 100644
index e775be5d063..00000000000
--- 
a/components/camel-caffeine/src/main/java/org/apache/camel/component/caffeine/resume/multi/CaffeineCache.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.caffeine.resume.multi;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
-import org.apache.camel.resume.cache.MultiEntryCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A cache that can store multiple key/valued resumables where the value is a 
list containing multiple values
- * 
- * @param <K> the type of the key
- * @param <V> the type of the value
- */
-public class CaffeineCache<K, V> implements MultiEntryCache<K, V> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(CaffeineCache.class);
-
-    private final Cache<K, List<V>> cache;
-    private final long cacheSize;
-
-    /**
-     * Builds a new instance of this object with the given cache size
-     * 
-     * @param cacheSize the size of the cache
-     */
-    public CaffeineCache(long cacheSize) {
-        this(Caffeine.newBuilder().maximumSize(cacheSize).build(), cacheSize);
-    }
-
-    /**
-     * Builds a new instance of this object
-     *
-     * @param cache     an instance of a pre-constructed cache object
-     * @param cacheSize the size of the pre-constructed cache object
-     */
-    public CaffeineCache(Cache<K, List<V>> cache, long cacheSize) {
-        this.cache = cache;
-        this.cacheSize = cacheSize;
-    }
-
-    @Override
-    public long capacity() {
-        return cacheSize;
-    }
-
-    @Override
-    public synchronized void add(K key, V offsetValue) {
-        LOG.trace("Adding entry to the cache (k/v): {}/{}", key, offsetValue);
-        LOG.trace("Adding entry to the cache (k/v) with types: {}/{}", 
key.getClass(), offsetValue.getClass());
-        List<V> entries = cache.get(key, k -> new ArrayList<>());
-
-        entries.add(offsetValue);
-    }
-
-    @Override
-    public synchronized boolean contains(K key, V entry) {
-        final List<V> entries = cache.getIfPresent(key);
-
-        if (entries == null) {
-            return false;
-        }
-
-        boolean ret = entries.contains(entry);
-        LOG.trace("Checking if cache contains key {} with value {} ({})", key, 
entry, ret);
-
-        return ret;
-    }
-
-    @Override
-    public boolean isFull() {
-        if (cache.estimatedSize() >= cacheSize) {
-            return true;
-        }
-
-        return false;
-    }
-
-}
diff --git 
a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
 
b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
index 8925c5459bd..cacab287d9d 100644
--- 
a/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
+++ 
b/components/camel-couchbase/src/test/java/org/apache/camel/component/couchbase/integration/ConsumeResumeStrategyIT.java
@@ -50,7 +50,7 @@ public class ConsumeResumeStrategyIT extends 
CouchbaseIntegrationTestBase {
         }
     }
 
-    TransientResumeStrategy resumeStrategy = new TransientResumeStrategy(new 
TestCouchbaseResumeAdapter());
+    private final TransientResumeStrategy resumeStrategy = new 
TransientResumeStrategy(new TestCouchbaseResumeAdapter());
 
     @Test
     public void testQueryForBeers() throws Exception {
diff --git 
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java
 
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java
index df8d2e10917..bd81936e55d 100644
--- 
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java
+++ 
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumable.java
@@ -19,13 +19,15 @@ package org.apache.camel.component.couchdb.consumer;
 
 import org.apache.camel.component.couchdb.CouchDbClientWrapper;
 import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
 import org.apache.camel.resume.Resumable;
+import org.apache.camel.support.resume.OffsetKeys;
 import org.apache.camel.support.resume.Offsets;
 
 /**
  * Wraps the resume data for CouchDb
  */
-public class CouchDbResumable implements Resumable<String, String> {
+public class CouchDbResumable implements Resumable {
     private final CouchDbClientWrapper clientWrapper;
     private String offset;
 
@@ -34,7 +36,6 @@ public class CouchDbResumable implements Resumable<String, 
String> {
         this.offset = offset;
     }
 
-    @Override
     public void updateLastOffset(String offset) {
         this.offset = offset;
     }
@@ -44,11 +45,6 @@ public class CouchDbResumable implements Resumable<String, 
String> {
         return Offsets.of(offset);
     }
 
-    @Override
-    public String getAddressable() {
-        return null;
-    }
-
     /**
      * Gets the client wrapper. Fine for local access, but should be 
restricted for global access on the API
      * 
@@ -57,4 +53,9 @@ public class CouchDbResumable implements Resumable<String, 
String> {
     CouchDbClientWrapper getClientWrapper() {
         return clientWrapper;
     }
+
+    @Override
+    public OffsetKey<?> getOffsetKey() {
+        return OffsetKeys.empty();
+    }
 }
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
index 9d730a528a4..95f16969813 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
@@ -31,10 +31,9 @@ import java.util.Set;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.camel.component.file.consumer.FileResumeAdapter;
-import org.apache.camel.component.file.consumer.FileResumeSet;
-import org.apache.camel.component.file.consumer.FileSetResumeAdapter;
-import org.apache.camel.component.file.consumer.GenericFileResumeAdapter;
+import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
+import org.apache.camel.component.file.consumer.FileOffsetResumeAdapter;
+import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
 import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.ResumeAware;
 import org.apache.camel.resume.ResumeStrategy;
@@ -107,8 +106,9 @@ public class FileConsumer extends GenericFileConsumer<File> 
implements ResumeAwa
 
             if (resumeStrategy != null) {
                 ResumeAdapter adapter = resumeStrategy.getAdapter();
-                if (adapter instanceof GenericFileResumeAdapter) {
-                    ((FileResumeAdapter) adapter).resume(gf);
+                if (adapter instanceof FileOffsetResumeAdapter) {
+                    ((FileOffsetResumeAdapter) adapter).setResumePayload(gf);
+                    adapter.resume();
                 }
             }
 
@@ -178,10 +178,11 @@ public class FileConsumer extends 
GenericFileConsumer<File> implements ResumeAwa
 
         if (resumeStrategy != null) {
             ResumeAdapter adapter = resumeStrategy.getAdapter();
-            if (adapter instanceof FileSetResumeAdapter) {
-                FileResumeSet resumeSet = new FileResumeSet(dirFiles);
+            if (adapter instanceof DirectoryEntriesResumeAdapter) {
+                DirectoryEntries resumeSet = new DirectoryEntries(directory, 
dirFiles);
 
-                ((FileResumeAdapter) adapter).resume(resumeSet);
+                ((DirectoryEntriesResumeAdapter) 
adapter).setResumePayload(resumeSet);
+                adapter.resume();
 
                 return resumeSet.resumed();
             }
@@ -315,6 +316,15 @@ public class FileConsumer extends 
GenericFileConsumer<File> implements ResumeAwa
         return 
!file.getFile().getAbsolutePath().equals(file.getAbsoluteFilePath());
     }
 
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        if (resumeStrategy != null) {
+            resumeStrategy.loadCache();
+        }
+    }
+
     @Override
     public ResumeStrategy getResumeStrategy() {
         return resumeStrategy;
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
index 42d52d5cc2f..25249ac20b8 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFile.java
@@ -23,7 +23,6 @@ import java.util.Map;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.WrappedFile;
-import org.apache.camel.component.file.consumer.GenericFileResumable;
 import org.apache.camel.resume.Offset;
 import org.apache.camel.support.resume.Offsets;
 import org.apache.camel.util.FileUtil;
@@ -35,7 +34,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Generic File. Specific implementations of a file based endpoint need to 
provide a File for transfer.
  */
-public class GenericFile<T> implements WrappedFile<T>, GenericFileResumable<T> 
{
+public class GenericFile<T> implements WrappedFile<T> {
     private static final Logger LOG = 
LoggerFactory.getLogger(GenericFile.class);
 
     private final boolean probeContentType;
@@ -416,21 +415,14 @@ public class GenericFile<T> implements WrappedFile<T>, 
GenericFileResumable<T> {
         this.copyFromAbsoluteFilePath = copyFromAbsoluteFilePath;
     }
 
-    @Override
     public void updateLastOffset(Long offset) {
         this.lastOffset = offset;
     }
 
-    @Override
     public Offset<Long> getLastOffset() {
         return Offsets.of(lastOffset);
     }
 
-    @Override
-    public T getAddressable() {
-        return file;
-    }
-
     /**
      * Fixes the path separator to be according to the protocol
      */
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumable.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/DirectoryEntriesResumeAdapter.java
similarity index 81%
rename from 
components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumable.java
rename to 
components/camel-file/src/main/java/org/apache/camel/component/file/consumer/DirectoryEntriesResumeAdapter.java
index 79899514a54..9d1de1297dd 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumable.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/DirectoryEntriesResumeAdapter.java
@@ -17,8 +17,10 @@
 
 package org.apache.camel.component.file.consumer;
 
-import org.apache.camel.resume.Resumable;
+import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
 
-public interface GenericFileResumable<T> extends Resumable<T, Long> {
+public interface DirectoryEntriesResumeAdapter {
+    default void setResumePayload(DirectoryEntries fileSet) {
 
+    }
 }
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeAdapter.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileOffsetResumeAdapter.java
similarity index 69%
rename from 
components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeAdapter.java
rename to 
components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileOffsetResumeAdapter.java
index 2e1f5e56477..34d13264542 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileSetResumeAdapter.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileOffsetResumeAdapter.java
@@ -17,9 +17,21 @@
 
 package org.apache.camel.component.file.consumer;
 
+import java.io.File;
+
+import org.apache.camel.component.file.GenericFile;
+
 /**
- * Allows the implementation of file adapters for handling resume operations 
for file sets (i.e.: file entries in a
- * directory)
+ * Provides an interface for adapters handling file offsets
  */
-public interface FileSetResumeAdapter extends FileResumeAdapter<FileResumeSet> 
{
+public interface FileOffsetResumeAdapter {
+
+    /**
+     * Sets the resume payload used for the adapter
+     * 
+     * @param genericFile a generic file instance
+     */
+    default void setResumePayload(GenericFile<File> genericFile) {
+
+    }
 }
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeAdapter.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeAdapter.java
index 25b807cbf40..0b2f9c113d5 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeAdapter.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeAdapter.java
@@ -22,12 +22,6 @@ import org.apache.camel.resume.ResumeAdapter;
 /**
  * Defines resume adapter for consumers of the file component.
  */
-public interface FileResumeAdapter<T> extends ResumeAdapter {
+public interface FileResumeAdapter extends ResumeAdapter {
 
-    /**
-     * Returns the last offset read for the given file.
-     *
-     * @param resumable the resumable file or resumable set to run the resume
-     */
-    void resume(T resumable);
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/AbstractFileResumeAdapter.java
similarity index 50%
copy from 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
copy to 
components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/AbstractFileResumeAdapter.java
index 064b721c8cf..0195a9325f1 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/AbstractFileResumeAdapter.java
@@ -15,33 +15,40 @@
  * limitations under the License.
  */
 
-package org.apache.camel.component.kafka.consumer.support;
+package org.apache.camel.component.file.consumer.adapters;
 
+import java.io.File;
+
+import org.apache.camel.component.file.consumer.FileResumeAdapter;
+import org.apache.camel.resume.Cacheable;
 import org.apache.camel.resume.Offset;
-import org.apache.camel.resume.Resumable;
-import org.apache.camel.support.resume.Offsets;
+import org.apache.camel.resume.OffsetKey;
+import org.apache.camel.resume.cache.ResumeCache;
 
-public class KafkaResumable implements Resumable<String, String> {
-    private final String partition;
-    private String offset;
+/**
+ * Base shared class for the file resume adapters
+ */
+abstract class AbstractFileResumeAdapter implements FileResumeAdapter, 
Cacheable {
+    protected ResumeCache<File> cache;
 
-    public KafkaResumable(String partition, String offset) {
-        this.partition = partition;
-        this.offset = offset;
+    protected AbstractFileResumeAdapter() {
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public void updateLastOffset(String offset) {
-        this.offset = offset;
+    public final void setCache(ResumeCache<?> cache) {
+        this.cache = (ResumeCache<File>) cache;
     }
 
     @Override
-    public Offset<String> getLastOffset() {
-        return Offsets.of(offset);
+    public final ResumeCache<?> getCache() {
+        return cache;
     }
 
     @Override
-    public String getAddressable() {
-        return partition;
+    public final boolean add(OffsetKey<?> key, Offset<?> offset) {
+        return add(key.getKey(), offset.offset());
     }
+
+    protected abstract boolean add(Object key, Object offset);
 }
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultDirectoryEntriesResumeAdapter.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultDirectoryEntriesResumeAdapter.java
new file mode 100644
index 00000000000..09e33457adb
--- /dev/null
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultDirectoryEntriesResumeAdapter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.file.consumer.adapters;
+
+import java.io.File;
+
+import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
+import org.apache.camel.component.file.consumer.FileResumeAdapter;
+
+/**
+ * An implementation of the {@link FileResumeAdapter} that can be used for 
resume operations for the file component.
+ * This one can be used to manage the resume operations for files within a 
directory.
+ */
+class DefaultDirectoryEntriesResumeAdapter extends AbstractFileResumeAdapter 
implements DirectoryEntriesResumeAdapter {
+    private DirectoryEntries fileSet;
+
+    private boolean notProcessed(File directory, File file) {
+        FileSet cached = cache.get(directory, FileSet.class);
+        if (cached == null) {
+            return true;
+        }
+
+        return !cached.contains(file);
+    }
+
+    @Override
+    public void setResumePayload(DirectoryEntries fileSet) {
+        assert fileSet != null;
+
+        this.fileSet = fileSet;
+    }
+
+    protected boolean add(Object key, Object offset) {
+        if (offset instanceof File) {
+            FileSet fileSet = (FileSet) cache.computeIfAbsent((File) key, k -> 
new FileSet());
+
+            fileSet.update((File) offset);
+        } else {
+            throw new UnsupportedOperationException("This adapter cannot be 
used for file offsets");
+        }
+
+        // For this one it's safe to always continue processing
+        return true;
+    }
+
+    private void resumeDirectoryEntries() {
+        DirectoryEntries.doResume(fileSet, f -> 
notProcessed(fileSet.getDirectory(), f));
+    }
+
+    @Override
+    public void resume() {
+        resumeDirectoryEntries();
+    }
+
+    public void deserializeFileEntry(File keyObj, File valueObj) {
+        FileSet fileSet = (FileSet) cache.computeIfAbsent(keyObj, obj -> new 
FileSet());
+
+        fileSet.update(valueObj);
+    }
+}
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileOffsetResumeAdapter.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileOffsetResumeAdapter.java
new file mode 100644
index 00000000000..51ac52f7933
--- /dev/null
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileOffsetResumeAdapter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.file.consumer.adapters;
+
+import java.io.File;
+
+import org.apache.camel.component.file.GenericFile;
+import org.apache.camel.component.file.consumer.FileOffsetResumeAdapter;
+import org.apache.camel.component.file.consumer.FileResumeAdapter;
+import org.apache.camel.resume.Offset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the {@link FileResumeAdapter} that can be used for 
resume operations for the file component.
+ * This can be used to manage the resume operations for a single file using 
its offset.
+ */
+class DefaultFileOffsetResumeAdapter extends AbstractFileResumeAdapter 
implements FileOffsetResumeAdapter {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultFileOffsetResumeAdapter.class);
+
+    private GenericFile<File> genericFile;
+
+    private Offset<?> getLastOffset(File addressable) {
+        return cache.get(addressable, Offset.class);
+    }
+
+    @Override
+    public void setResumePayload(GenericFile<File> genericFile) {
+        assert genericFile != null;
+        this.genericFile = genericFile;
+    }
+
+    public boolean add(Object key, Object offset) {
+        if (offset instanceof Long) {
+            FileOffset fileOffset = (FileOffset) cache.computeIfAbsent((File) 
key, k -> new FileOffset());
+
+            fileOffset.update((Long) offset);
+        } else {
+            throw new UnsupportedOperationException("This adapter cannot be 
used for directory entries");
+        }
+
+        // For this one it's safe to always continue processing
+        return true;
+    }
+
+    private void resumeFileOffsets() {
+        if (genericFile == null) {
+            return;
+        }
+
+        final Offset<?> lastOffset = getLastOffset(genericFile.getFile());
+
+        if (lastOffset == null) {
+            return;
+        }
+
+        Object offsetObj = lastOffset.offset();
+        if (offsetObj == null) {
+            return;
+        }
+
+        if (offsetObj instanceof Long) {
+            genericFile.updateLastOffset((Long) offsetObj);
+        } else {
+            // This should never happen
+            LOG.warn("Cannot perform a resume operation of an object of 
unhandled type: {}", offsetObj.getClass());
+        }
+    }
+
+    @Override
+    public void resume() {
+        resumeFileOffsets();
+    }
+
+    public void deserializeFileOffset(File keyObj, Long valueObj) {
+        FileOffset longOffset = (FileOffset) cache.computeIfAbsent(keyObj, obj 
-> new FileOffset());
+
+        longOffset.update(valueObj);
+    }
+}
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileSetResumeAdapter.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileSetResumeAdapter.java
deleted file mode 100644
index 9a1f7f49f76..00000000000
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultFileSetResumeAdapter.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.file.consumer.adapters;
-
-import java.io.File;
-
-import org.apache.camel.component.file.consumer.FileResumeSet;
-import org.apache.camel.component.file.consumer.FileSetResumeAdapter;
-import org.apache.camel.resume.cache.MultiEntryCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An implementation of the {@link FileSetResumeAdapter} that can be used for 
resume operations for multiple files. For
- * instance, this can be used to manage the resume operations for files within 
a directory.
- */
-public class DefaultFileSetResumeAdapter implements FileSetResumeAdapter {
-    private static final Logger LOG = 
LoggerFactory.getLogger(DefaultFileSetResumeAdapter.class);
-
-    private final MultiEntryCache<File, File> cache;
-
-    public DefaultFileSetResumeAdapter(MultiEntryCache<File, File> cache) {
-        this.cache = cache;
-    }
-
-    private boolean notProcessed(File file) {
-        File key = file.getParentFile();
-
-        // if the file is in the cache, then it's already processed
-        boolean ret = !cache.contains(key, file);
-        return ret;
-    }
-
-    @Override
-    public void resume(FileResumeSet resumable) {
-        if (resumable != null) {
-            resumable.resumeEach(this::notProcessed);
-            if (resumable.hasResumables()) {
-                LOG.debug("There's {} files to still to be processed", 
resumable.resumed().length);
-            }
-        } else {
-            LOG.trace("Nothing to resume");
-        }
-    }
-
-    @Override
-    public void resume() {
-        // NO-OP
-    }
-}
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultGenericFileResumeAdapter.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultGenericFileResumeAdapter.java
deleted file mode 100644
index 54e339fe73e..00000000000
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DefaultGenericFileResumeAdapter.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.component.file.consumer.adapters;
-
-import java.io.File;
-import java.util.Optional;
-
-import org.apache.camel.component.file.consumer.GenericFileResumable;
-import org.apache.camel.component.file.consumer.GenericFileResumeAdapter;
-import org.apache.camel.resume.cache.SingleEntryCache;
-
-/**
- * An implementation of the {@link GenericFileResumeAdapter} that can be used 
to handle resume operations for file
- * offsets (where the offsets are of Long format).
- */
-public class DefaultGenericFileResumeAdapter implements 
GenericFileResumeAdapter {
-    private final SingleEntryCache<File, Long> cache;
-
-    public DefaultGenericFileResumeAdapter(SingleEntryCache<File, Long> cache) 
{
-        this.cache = cache;
-    }
-
-    private Optional<Long> getLastOffset(GenericFileResumable<File> resumable) 
{
-        final File addressable = resumable.getAddressable();
-        return cache.get(addressable);
-    }
-
-    @Override
-    public Optional<Long> getLastOffset(File addressable) {
-        return cache.get(addressable);
-    }
-
-    @Override
-    public void resume(GenericFileResumable<File> resumable) {
-        final Optional<Long> lastOffsetOpt = getLastOffset(resumable);
-
-        if (!lastOffsetOpt.isPresent()) {
-            return;
-        }
-
-        final long lastOffset = lastOffsetOpt.get();
-        resumable.updateLastOffset(lastOffset);
-    }
-
-    @Override
-    public void resume() {
-
-    }
-}
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DirectoryEntries.java
similarity index 70%
rename from 
components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java
rename to 
components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DirectoryEntries.java
index 41b1751223c..5795dc5042e 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/FileResumeSet.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/DirectoryEntries.java
@@ -15,35 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.camel.component.file.consumer;
+package org.apache.camel.component.file.consumer.adapters;
 
 import java.io.File;
 import java.util.Objects;
 import java.util.function.Predicate;
 
-import org.apache.camel.resume.ResumableSet;
+import org.apache.camel.support.resume.Resumables;
 
 /**
  * This contains the input/output file set for resume operations.
  */
-public final class FileResumeSet implements ResumableSet<File> {
+public final class DirectoryEntries {
+    private final File directory;
     private final File[] inputFiles;
     private File[] outputFiles;
 
-    public FileResumeSet(File[] inputFiles) {
+    public DirectoryEntries(File directory, File[] inputFiles) {
+        this.directory = directory;
         this.inputFiles = Objects.requireNonNull(inputFiles,
                 "A list of input files must be provided for the resume info");
     }
 
-    /**
-     * Iterates over the set of input files checking if they should be resumed 
or not
-     *
-     * @param resumableCheck a checker method that returns true if the file 
should be resumed or false otherwise
-     */
-    public void resumeEach(Predicate<File> resumableCheck) {
-        this.outputFiles = resumeEach(inputFiles, resumableCheck);
-    }
-
     /**
      * Gets the files that should be resumed
      *
@@ -65,4 +58,18 @@ public final class FileResumeSet implements 
ResumableSet<File> {
 
         return false;
     }
+
+    public File getDirectory() {
+        return directory;
+    }
+
+    public void setOutputFiles(File[] resumed) {
+        this.outputFiles = resumed;
+    }
+
+    public static void doResume(DirectoryEntries directoryEntries, 
Predicate<File> resumableCheck) {
+        File[] processed = Resumables.resumeEach(directoryEntries.inputFiles,
+                resumableCheck);
+        directoryEntries.setOutputFiles(processed);
+    }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaOffset.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileOffset.java
similarity index 63%
rename from 
components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaOffset.java
rename to 
components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileOffset.java
index 11a0879a59c..26ec8477d64 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaOffset.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileOffset.java
@@ -15,25 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.camel.component.kafka.consumer.support;
+package org.apache.camel.component.file.consumer.adapters;
 
 import org.apache.camel.resume.Offset;
-import org.apache.camel.util.KeyValueHolder;
 
-/**
- * Offset class for Kafka
- */
-public class KafkaOffset implements Offset<KeyValueHolder<String, String>> {
-    private final String topicPartition;
-    private final String offset;
+public class FileOffset implements Offset<Long> {
+    private Long offset;
+
+    public FileOffset() {
+    }
 
-    public KafkaOffset(String topicPartition, String offset) {
-        this.topicPartition = topicPartition;
+    public FileOffset(Long offset) {
         this.offset = offset;
     }
 
     @Override
-    public KeyValueHolder<String, String> offset() {
-        return new KeyValueHolder<>(topicPartition, offset);
+    public void update(Long offset) {
+        this.offset = offset;
+    }
+
+    @Deprecated
+    public Object getLastOffset() {
+        return offset;
+    }
+
+    @Override
+    public Long offset() {
+        return offset;
     }
 }
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java
new file mode 100644
index 00000000000..f75e51d199b
--- /dev/null
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileResumeAdapterDelegate.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.file.consumer.adapters;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.apache.camel.component.file.GenericFile;
+import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
+import org.apache.camel.component.file.consumer.FileOffsetResumeAdapter;
+import org.apache.camel.component.file.consumer.FileResumeAdapter;
+import org.apache.camel.resume.Cacheable;
+import org.apache.camel.resume.Deserializable;
+import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
+import org.apache.camel.resume.cache.ResumeCache;
+
+public class FileResumeAdapterDelegate
+        implements FileResumeAdapter, Cacheable, Deserializable, 
FileOffsetResumeAdapter, DirectoryEntriesResumeAdapter {
+    private final DefaultDirectoryEntriesResumeAdapter 
directoryEntriesResumeAdapter
+            = new DefaultDirectoryEntriesResumeAdapter();
+    private final DefaultFileOffsetResumeAdapter fileOffsetResumeAdapter = new 
DefaultFileOffsetResumeAdapter();
+
+    @Override
+    public void setResumePayload(GenericFile<File> genericFile) {
+        fileOffsetResumeAdapter.setResumePayload(genericFile);
+    }
+
+    @Override
+    public void setResumePayload(DirectoryEntries fileSet) {
+        directoryEntriesResumeAdapter.setResumePayload(fileSet);
+    }
+
+    @Override
+    public boolean add(OffsetKey<?> key, Offset<?> offset) {
+        Object offsetObj = offset.offset();
+
+        if (offsetObj instanceof Long) {
+            return fileOffsetResumeAdapter.add(key, offset);
+        } else {
+            return directoryEntriesResumeAdapter.add(key, offset);
+        }
+    }
+
+    @Override
+    public void setCache(ResumeCache<?> cache) {
+        fileOffsetResumeAdapter.setCache(cache);
+        directoryEntriesResumeAdapter.setCache(cache);
+    }
+
+    @Override
+    public ResumeCache<?> getCache() {
+        return fileOffsetResumeAdapter.getCache();
+    }
+
+    @Override
+    public boolean deserialize(ByteBuffer keyBuffer, ByteBuffer valueBuffer) {
+        Object keyObj = deserializeObject(keyBuffer);
+        Object valueObj = deserializeObject(valueBuffer);
+
+        if (valueObj instanceof File) {
+            directoryEntriesResumeAdapter.deserializeFileEntry((File) keyObj, 
(File) valueObj);
+        }
+
+        if (valueObj instanceof Long) {
+            fileOffsetResumeAdapter.deserializeFileOffset((File) keyObj, 
(Long) valueObj);
+        }
+
+        return true;
+    }
+
+    @Override
+    public void resume() {
+        fileOffsetResumeAdapter.resume();
+        directoryEntriesResumeAdapter.resume();
+    }
+}
diff --git 
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeAdapter.java
 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileSet.java
similarity index 55%
rename from 
components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeAdapter.java
rename to 
components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileSet.java
index 8f7c1f2900c..e4502fc7c35 100644
--- 
a/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/GenericFileResumeAdapter.java
+++ 
b/components/camel-file/src/main/java/org/apache/camel/component/file/consumer/adapters/FileSet.java
@@ -15,20 +15,41 @@
  * limitations under the License.
  */
 
-package org.apache.camel.component.file.consumer;
+package org.apache.camel.component.file.consumer.adapters;
 
 import java.io.File;
-import java.util.Optional;
+import java.util.ArrayList;
+import java.util.List;
 
-/**
- * Allows the implementation of file adapters for handling resume operations 
for generic files
- */
-public interface GenericFileResumeAdapter extends 
FileResumeAdapter<GenericFileResumable<File>> {
-    /**
-     * Gets the last offset for the given file
-     *
-     * @param  addressable the file instance
-     * @return             An Optional with the offset value
-     */
-    Optional<Long> getLastOffset(File addressable);
+import org.apache.camel.resume.Offset;
+
+public class FileSet implements Offset<File> {
+    private final List<File> files = new ArrayList<>();
+
+    public FileSet() {
+
+    }
+
+    public FileSet(File offset) {
+        files.add(offset);
+    }
+
+    @Deprecated
+    public Object getLastOffset() {
+        return files;
+    }
+
+    public boolean contains(Object o) {
+        return files.contains(o);
+    }
+
+    @Override
+    public void update(File offset) {
+        files.add(offset);
+    }
+
+    @Override
+    public File offset() {
+        return null;
+    }
 }
diff --git 
a/components/camel-file/src/main/resources/org/apache/camel/resume/adapter.properties
 
b/components/camel-file/src/main/resources/org/apache/camel/resume/adapter.properties
new file mode 100644
index 00000000000..bb7eec9fbd3
--- /dev/null
+++ 
b/components/camel-file/src/main/resources/org/apache/camel/resume/adapter.properties
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+adapterClass=org.apache.camel.component.file.consumer.adapters.FileResumeAdapterDelegate
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
index 064b721c8cf..c49d3e085bd 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaResumable.java
@@ -18,30 +18,27 @@
 package org.apache.camel.component.kafka.consumer.support;
 
 import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
 import org.apache.camel.resume.Resumable;
+import org.apache.camel.support.resume.OffsetKeys;
 import org.apache.camel.support.resume.Offsets;
 
-public class KafkaResumable implements Resumable<String, String> {
+public class KafkaResumable implements Resumable {
     private final String partition;
-    private String offset;
+    private final String offset;
 
     public KafkaResumable(String partition, String offset) {
         this.partition = partition;
         this.offset = offset;
     }
 
-    @Override
-    public void updateLastOffset(String offset) {
-        this.offset = offset;
-    }
-
     @Override
     public Offset<String> getLastOffset() {
         return Offsets.of(offset);
     }
 
     @Override
-    public String getAddressable() {
-        return partition;
+    public OffsetKey<?> getOffsetKey() {
+        return OffsetKeys.unmodifiableOf(partition);
     }
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
index a1614d4bfc2..9951e09303b 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/KafkaResumeStrategy.java
@@ -24,9 +24,8 @@ import 
org.apache.camel.resume.UpdatableConsumerResumeStrategy;
 /**
  * Base interface for resume strategies that publish the offsets to a Kafka 
topic
  * 
- * @param <K> the type of key
- * @param <V> the type of the value
+ * @param <T> the type of resumable
  */
-public interface KafkaResumeStrategy<K, V> extends 
UpdatableConsumerResumeStrategy<K, V, Resumable<K, V>>, ResumeStrategy {
+public interface KafkaResumeStrategy<T extends Resumable> extends 
UpdatableConsumerResumeStrategy<T>, ResumeStrategy {
 
 }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
index e9974cac4e8..7ece1b23699 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
@@ -17,16 +17,19 @@
 
 package org.apache.camel.processor.resume.kafka;
 
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import org.apache.camel.resume.ResumeAdapter;
-import org.apache.camel.resume.cache.ResumeCache;
+import org.apache.camel.resume.Deserializable;
+import org.apache.camel.resume.Resumable;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,9 +39,8 @@ import org.slf4j.LoggerFactory;
  * integrations. This is suitable, for instance, when using clusters with the 
master component.
  *
  * @param <K> the type of key
- * @param <V> the type of the value
  */
-public class MultiNodeKafkaResumeStrategy<K, V> extends 
SingleNodeKafkaResumeStrategy<K, V> {
+public class MultiNodeKafkaResumeStrategy<K extends Resumable> extends 
SingleNodeKafkaResumeStrategy<K> {
     private static final Logger LOG = 
LoggerFactory.getLogger(MultiNodeKafkaResumeStrategy.class);
     private final ExecutorService executorService;
 
@@ -47,29 +49,23 @@ public class MultiNodeKafkaResumeStrategy<K, V> extends 
SingleNodeKafkaResumeStr
      * 
      * @param bootstrapServers the address of the Kafka broker
      * @param topic            the topic where to publish the offsets
-     * @param resumeCache      a cache instance where to store the offsets 
locally for faster access
-     * @param resumeAdapter    the component-specific resume adapter
      */
-    public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic, 
ResumeCache<K, V> resumeCache,
-                                        ResumeAdapter resumeAdapter) {
+    public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic) 
{
         // just in case users don't want to provide their own worker thread 
pool
-        this(bootstrapServers, topic, resumeCache, resumeAdapter, 
Executors.newSingleThreadExecutor());
+        this(bootstrapServers, topic, Executors.newSingleThreadExecutor());
     }
 
     /**
      * Builds an instance of this class
      *
-     * @param bootstrapServers
+     * @param bootstrapServers the address of the Kafka broker
      * @param topic            the topic where to publish the offsets
-     * @param resumeCache      a cache instance where to store the offsets 
locally for faster access
-     * @param resumeAdapter    the component-specific resume adapter
      * @param executorService  an executor service that will run a separate 
thread for periodically refreshing the
      *                         offsets
      */
 
-    public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic, 
ResumeCache<K, V> resumeCache,
-                                        ResumeAdapter resumeAdapter, 
ExecutorService executorService) {
-        super(bootstrapServers, topic, resumeCache, resumeAdapter);
+    public MultiNodeKafkaResumeStrategy(String bootstrapServers, String topic, 
ExecutorService executorService) {
+        super(bootstrapServers, topic);
 
         // We need to keep refreshing the cache
         this.executorService = executorService;
@@ -80,36 +76,56 @@ public class MultiNodeKafkaResumeStrategy<K, V> extends 
SingleNodeKafkaResumeStr
      * Builds an instance of this class
      *
      * @param topic          the topic where to publish the offsets
-     * @param resumeCache    a cache instance where to store the offsets 
locally for faster access
-     * @param resumeAdapter  the component-specific resume adapter
      * @param producerConfig the set of properties to be used by the Kafka 
producer within this class
      * @param consumerConfig the set of properties to be used by the Kafka 
consumer within this class
      */
-    public MultiNodeKafkaResumeStrategy(String topic, ResumeCache<K, V> 
resumeCache, ResumeAdapter resumeAdapter,
-                                        Properties producerConfig, Properties 
consumerConfig) {
-        this(topic, resumeCache, resumeAdapter, producerConfig, 
consumerConfig, Executors.newSingleThreadExecutor());
+    public MultiNodeKafkaResumeStrategy(String topic, Properties 
producerConfig, Properties consumerConfig) {
+        this(topic, producerConfig, consumerConfig, 
Executors.newSingleThreadExecutor());
     }
 
     /**
      * Builds an instance of this class
      *
      * @param topic           the topic where to publish the offsets
-     * @param resumeCache     a cache instance where to store the offsets 
locally for faster access
-     * @param resumeAdapter   the component-specific resume adapter
      * @param producerConfig  the set of properties to be used by the Kafka 
producer within this class
      * @param consumerConfig  the set of properties to be used by the Kafka 
consumer within this class
      * @param executorService an executor service that will run a separate 
thread for periodically refreshing the
      *                        offsets
      */
 
-    public MultiNodeKafkaResumeStrategy(String topic, ResumeCache<K, V> 
resumeCache, ResumeAdapter resumeAdapter,
-                                        Properties producerConfig, Properties 
consumerConfig, ExecutorService executorService) {
-        super(topic, resumeCache, resumeAdapter, producerConfig, 
consumerConfig);
+    public MultiNodeKafkaResumeStrategy(String topic, Properties 
producerConfig, Properties consumerConfig,
+                                        ExecutorService executorService) {
+        super(topic, producerConfig, consumerConfig);
 
         this.executorService = executorService;
         executorService.submit(() -> refresh());
     }
 
+    protected void poll() {
+        poll(getConsumer());
+    }
+
+    protected void poll(Consumer<byte[], byte[]> consumer) {
+        Deserializable deserializable = (Deserializable) getAdapter();
+
+        ConsumerRecords<byte[], byte[]> records;
+        do {
+            records = consume(10, consumer);
+
+            if (records.isEmpty()) {
+                break;
+            }
+
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                byte[] value = record.value();
+
+                LOG.trace("Read from Kafka: {}", value);
+
+                deserializable.deserialize(ByteBuffer.wrap(record.key()), 
ByteBuffer.wrap(record.value()));
+            }
+        } while (true);
+    }
+
     /**
      * Launch a thread to refresh the offsets periodically
      */
@@ -119,22 +135,10 @@ public class MultiNodeKafkaResumeStrategy<K, V> extends 
SingleNodeKafkaResumeStr
             Properties prop = (Properties) getConsumerConfig().clone();
             prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
 
-            Consumer<K, V> consumer = new KafkaConsumer<>(prop);
-
-            consumer.subscribe(Collections.singletonList(getTopic()));
-
-            while (true) {
-                var records = consumer.poll(getPollDuration());
-                if (records.isEmpty()) {
-                    continue;
-                }
-
-                for (var record : records) {
-                    V value = record.value();
+            try (Consumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(prop)) {
+                consumer.subscribe(Collections.singletonList(getTopic()));
 
-                    LOG.trace("Read from Kafka: {}", value);
-                    getResumeCache().add(record.key(), record.value());
-                }
+                poll(consumer);
             }
         } catch (Exception e) {
             LOG.error("Error while refreshing the local cache: {}", 
e.getMessage(), e);
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
index 0a14bb481f6..0274fbd2e30 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/SingleNodeKafkaResumeStrategy.java
@@ -18,6 +18,7 @@
 package org.apache.camel.processor.resume.kafka;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
@@ -28,6 +29,11 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.resume.Cacheable;
+import org.apache.camel.resume.Deserializable;
+import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
 import org.apache.camel.resume.Resumable;
 import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.cache.ResumeCache;
@@ -45,8 +51,8 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,54 +60,45 @@ import org.slf4j.LoggerFactory;
  * A resume strategy that publishes offsets to a Kafka topic. This resume 
strategy is suitable for single node
  * integrations. For multi-node integrations (i.e: using clusters with the 
master component check
  * {@link MultiNodeKafkaResumeStrategy}.
- * 
- * @param <K> the type of key
- * @param <V> the type of the value
+ *
  */
-public class SingleNodeKafkaResumeStrategy<K, V> implements 
KafkaResumeStrategy<K, V> {
+public class SingleNodeKafkaResumeStrategy<T extends Resumable> implements 
KafkaResumeStrategy<T> {
     private static final Logger LOG = 
LoggerFactory.getLogger(SingleNodeKafkaResumeStrategy.class);
 
     private final String topic;
 
-    private Consumer<K, V> consumer;
-    private Producer<K, V> producer;
+    private Consumer<byte[], byte[]> consumer;
+    private Producer<byte[], byte[]> producer;
     private Duration pollDuration = Duration.ofSeconds(1);
 
     private final Queue<RecordError> producerErrors = new 
ConcurrentLinkedQueue<>();
-    private final ResumeCache<K, V> resumeCache;
+
     private boolean subscribed;
     private final Properties producerConfig;
     private final Properties consumerConfig;
-    private ResumeAdapter resumeAdapter;
+    private ResumeAdapter adapter;
 
     /**
      * Builds an instance of this class
      * 
      * @param bootstrapServers the address of the Kafka broker
      * @param topic            the topic where to publish the offsets
-     * @param resumeCache      a cache instance where to store the offsets 
locally for faster access
-     * @param resumeAdapter    the component-specific resume adapter
+     *
      */
-    public SingleNodeKafkaResumeStrategy(String bootstrapServers, String 
topic, ResumeCache<K, V> resumeCache,
-                                         ResumeAdapter resumeAdapter) {
-        this(topic, resumeCache, resumeAdapter, 
createProducer(bootstrapServers), createConsumer(bootstrapServers));
+    public SingleNodeKafkaResumeStrategy(String bootstrapServers, String 
topic) {
+        this(topic, createProducer(bootstrapServers), 
createConsumer(bootstrapServers));
     }
 
     /**
      * Builds an instance of this class
      *
      * @param topic          the topic where to publish the offsets
-     * @param resumeCache    a cache instance where to store the offsets 
locally for faster access
-     * @param resumeAdapter  the component-specific resume adapter
      * @param producerConfig the set of properties to be used by the Kafka 
producer within this class
      * @param consumerConfig the set of properties to be used by the Kafka 
consumer within this class
      */
-    public SingleNodeKafkaResumeStrategy(String topic, ResumeCache<K, V> 
resumeCache, ResumeAdapter resumeAdapter,
-                                         Properties producerConfig,
+    public SingleNodeKafkaResumeStrategy(String topic, Properties 
producerConfig,
                                          Properties consumerConfig) {
         this.topic = ObjectHelper.notNull(topic, "The topic must not be null");
-        this.resumeCache = resumeCache;
-        this.resumeAdapter = resumeAdapter;
         this.producerConfig = producerConfig;
         this.consumerConfig = consumerConfig;
 
@@ -117,8 +114,8 @@ public class SingleNodeKafkaResumeStrategy<K, V> implements 
KafkaResumeStrategy<
     public static Properties createProducer(String bootstrapServers) {
         Properties config = new Properties();
 
-        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
 
         StringHelper.notEmpty(bootstrapServers, "bootstrapServers");
         config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
@@ -135,8 +132,8 @@ public class SingleNodeKafkaResumeStrategy<K, V> implements 
KafkaResumeStrategy<
     public static Properties createConsumer(String bootstrapServers) {
         Properties config = new Properties();
 
-        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
-        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
 
         StringHelper.notEmpty(bootstrapServers, "bootstrapServers");
         config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
@@ -160,8 +157,8 @@ public class SingleNodeKafkaResumeStrategy<K, V> implements 
KafkaResumeStrategy<
      * @throws InterruptedException
      *
      */
-    protected void produce(K key, V message) throws ExecutionException, 
InterruptedException {
-        ProducerRecord<K, V> record = new ProducerRecord<>(topic, key, 
message);
+    protected void produce(byte[] key, byte[] message) throws 
ExecutionException, InterruptedException {
+        ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, 
key, message);
 
         producer.send(record, (recordMetadata, e) -> {
             if (e != null) {
@@ -171,16 +168,27 @@ public class SingleNodeKafkaResumeStrategy<K, V> 
implements KafkaResumeStrategy<
         });
     }
 
+    protected void doAdd(OffsetKey<?> key, Offset<?> offsetValue) {
+        if (adapter instanceof Cacheable) {
+            Cacheable cacheable = (Cacheable) adapter;
+
+            cacheable.add(key, offsetValue);
+        }
+    }
+
     @Override
-    public void updateLastOffset(Resumable<K, V> offset) throws Exception {
-        K key = offset.getAddressable();
-        V offsetValue = offset.getLastOffset().offset();
+    public void updateLastOffset(T offset) throws Exception {
+        OffsetKey<?> key = offset.getOffsetKey();
+        Offset<?> offsetValue = offset.getLastOffset();
+
+        LOG.debug("Updating offset on Kafka with key {} to {}", key.getKey(), 
offsetValue.offset());
 
-        LOG.debug("Updating offset on Kafka with key {} to {}", key, 
offsetValue);
+        ByteBuffer keyBuffer = key.serialize();
+        ByteBuffer valueBuffer = offsetValue.serialize();
 
-        produce(key, offsetValue);
+        produce(keyBuffer.array(), valueBuffer.array());
 
-        resumeCache.add(key, offsetValue);
+        doAdd(key, offsetValue);
     }
 
     /**
@@ -188,12 +196,23 @@ public class SingleNodeKafkaResumeStrategy<K, V> 
implements KafkaResumeStrategy<
      * 
      * @throws Exception
      */
-    protected void loadCache() throws Exception {
+    public void loadCache() throws Exception {
         subscribe();
 
         LOG.debug("Loading records from topic {}", topic);
 
-        ConsumerRecords<K, V> records;
+        if (!(adapter instanceof Deserializable)) {
+            throw new RuntimeCamelException("Cannot load data for an adapter 
that is not deserializable");
+        }
+        poll();
+
+        unsubscribe();
+    }
+
+    protected void poll() {
+        Deserializable deserializable = (Deserializable) adapter;
+
+        ConsumerRecords<byte[], byte[]> records;
         do {
             records = consume();
 
@@ -201,19 +220,16 @@ public class SingleNodeKafkaResumeStrategy<K, V> 
implements KafkaResumeStrategy<
                 break;
             }
 
-            for (ConsumerRecord<K, V> record : records) {
-                V value = record.value();
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                byte[] value = record.value();
 
                 LOG.trace("Read from Kafka: {}", value);
-                resumeCache.add(record.key(), record.value());
 
-                if (resumeCache.isFull()) {
+                if (!deserializable.deserialize(ByteBuffer.wrap(record.key()), 
ByteBuffer.wrap(record.value()))) {
                     break;
                 }
             }
         } while (true);
-
-        unsubscribe();
     }
 
     /**
@@ -247,7 +263,7 @@ public class SingleNodeKafkaResumeStrategy<K, V> implements 
KafkaResumeStrategy<
      * Creates a new consumer rebalance listener. This can be useful for 
setting the exact Kafka offset when necessary
      * to read a limited amount of messages or customize the resume strategy 
behavior when a rebalance occurs.
      * 
-     * @param  remaining
+     * @param  remaining the number of remaining messages on the topic to try 
to collect
      * @return
      */
     protected ConsumerRebalanceListener getConsumerRebalanceListener(long 
remaining) {
@@ -292,7 +308,7 @@ public class SingleNodeKafkaResumeStrategy<K, V> implements 
KafkaResumeStrategy<
      *
      * @return An instance of the consumer records
      */
-    protected ConsumerRecords<K, V> consume() {
+    protected ConsumerRecords<byte[], byte[]> consume() {
         int retries = 10;
 
         return consume(retries);
@@ -304,9 +320,28 @@ public class SingleNodeKafkaResumeStrategy<K, V> 
implements KafkaResumeStrategy<
      * @param  retries how many times to retry consuming data from the topic
      * @return         An instance of the consumer records
      */
-    protected ConsumerRecords<K, V> consume(int retries) {
+    protected ConsumerRecords<byte[], byte[]> consume(int retries) {
         while (retries > 0) {
-            ConsumerRecords<K, V> records = consumer.poll(pollDuration);
+            ConsumerRecords<byte[], byte[]> records = 
consumer.poll(pollDuration);
+            if (!records.isEmpty()) {
+                return records;
+            }
+            retries--;
+        }
+
+        return ConsumerRecords.empty();
+    }
+
+    /**
+     * Consumes message from the topic previously setup
+     *
+     * @param  retries  how many times to retry consuming data from the topic
+     * @param  consumer the kafka consumer object instance to use
+     * @return          An instance of the consumer records
+     */
+    protected ConsumerRecords<byte[], byte[]> consume(int retries, 
Consumer<byte[], byte[]> consumer) {
+        while (retries > 0) {
+            ConsumerRecords<byte[], byte[]> records = 
consumer.poll(pollDuration);
             if (!records.isEmpty()) {
                 return records;
             }
@@ -317,8 +352,14 @@ public class SingleNodeKafkaResumeStrategy<K, V> 
implements KafkaResumeStrategy<
     }
 
     public void subscribe() throws Exception {
-        if (resumeCache.capacity() >= 1) {
-            checkAndSubscribe(topic, resumeCache.capacity());
+        if (adapter instanceof Cacheable) {
+            ResumeCache<?> cache = ((Cacheable) adapter).getCache();
+
+            if (cache.capacity() >= 1) {
+                checkAndSubscribe(topic, cache.capacity());
+            } else {
+                checkAndSubscribe(topic);
+            }
         } else {
             checkAndSubscribe(topic);
         }
@@ -326,13 +367,18 @@ public class SingleNodeKafkaResumeStrategy<K, V> 
implements KafkaResumeStrategy<
 
     @Override
     public ResumeAdapter getAdapter() {
-        return resumeAdapter;
+        return adapter;
+    }
+
+    @Override
+    public void setAdapter(ResumeAdapter adapter) {
+        this.adapter = adapter;
     }
 
     /**
      * Gets the set record of sent items
      *
-     * @return
+     * @return A collection with all the record errors
      */
     protected Collection<RecordError> getProducerErrors() {
         return Collections.unmodifiableCollection(producerErrors);
@@ -373,12 +419,6 @@ public class SingleNodeKafkaResumeStrategy<K, V> 
implements KafkaResumeStrategy<
     @Override
     public void start() {
         LOG.info("Starting the kafka resume strategy");
-
-        try {
-            loadCache();
-        } catch (Exception e) {
-            LOG.error("Failed to load already processed items: {}", 
e.getMessage(), e);
-        }
     }
 
     public Duration getPollDuration() {
@@ -389,11 +429,11 @@ public class SingleNodeKafkaResumeStrategy<K, V> 
implements KafkaResumeStrategy<
         this.pollDuration = Objects.requireNonNull(pollDuration, "The poll 
duration cannot be null");
     }
 
-    protected Consumer<K, V> getConsumer() {
+    protected Consumer<byte[], byte[]> getConsumer() {
         return consumer;
     }
 
-    protected Producer<K, V> getProducer() {
+    protected Producer<byte[], byte[]> getProducer() {
         return producer;
     }
 
@@ -409,14 +449,11 @@ public class SingleNodeKafkaResumeStrategy<K, V> 
implements KafkaResumeStrategy<
         return topic;
     }
 
-    protected ResumeCache<K, V> getResumeCache() {
-        return resumeCache;
-    }
-
     /**
      * Clear the producer errors
      */
     public void resetProducerErrors() {
         producerErrors.clear();
     }
+
 }
diff --git 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
index 57669df0920..7397109fb3a 100644
--- 
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
+++ 
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
@@ -31,6 +31,7 @@ import 
org.apache.camel.component.kafka.consumer.support.KafkaResumable;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.resume.TransientResumeStrategy;
 import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
 import org.apache.camel.resume.Resumable;
 import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.UpdatableConsumerResumeStrategy;
@@ -60,10 +61,9 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends 
BaseEmbeddedKafkaTes
     @BindToRegistry("resumeStrategy")
     private TestUpdateStrategy resumeStrategy;
     private CountDownLatch messagesLatch;
-    private KafkaProducer<Object, Object> producer;
 
     private static class TestUpdateStrategy extends TransientResumeStrategy
-            implements UpdatableConsumerResumeStrategy<String, Integer, 
Resumable<String, Integer>> {
+            implements UpdatableConsumerResumeStrategy<Resumable> {
         private final CountDownLatch messagesLatch;
         private boolean startCalled;
         private boolean offsetNull = true;
@@ -91,25 +91,25 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends 
BaseEmbeddedKafkaTes
         }
 
         @Override
-        public void updateLastOffset(Resumable<String, Integer> offset) throws 
Exception {
+        public void updateLastOffset(Resumable offset) {
             try {
                 if (offset != null) {
                     offsetNull = false;
 
-                    String addressable = offset.getAddressable();
+                    OffsetKey<?> addressable = offset.getOffsetKey();
                     if (addressable != null) {
                         offsetAddressableNull = false;
-                        offsetAddressableEmpty = addressable.isEmpty() || 
addressable.isBlank();
+                        offsetAddressableEmpty = addressable.getKey() == null;
 
                     }
 
-                    Offset<Integer> offsetValue = offset.getLastOffset();
+                    Offset<?> offsetValue = offset.getLastOffset();
                     if (offsetValue != null) {
                         offsetValueNull = false;
 
                         if (offsetValue.offset() != null) {
                             offsetValueEmpty = false;
-                            lastOffset = offsetValue.offset();
+                            lastOffset = (int) offsetValue.offset();
                         }
                     }
                 }
@@ -176,9 +176,9 @@ public class KafkaConsumerWithResumeRouteStrategyIT extends 
BaseEmbeddedKafkaTes
     @BeforeEach
     public void before() {
         Properties props = getDefaultProperties();
+        KafkaProducer<Object, Object> producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
 
         for (int i = 0; i < 10; i++) {
-            producer = new 
org.apache.kafka.clients.producer.KafkaProducer<>(props);
             producer.send(new ProducerRecord<>(TOPIC, String.valueOf(i)));
         }
     }
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java 
b/core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java
similarity index 54%
copy from 
core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
copy to core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java
index 15e38fa0621..1b2e9cc5a36 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/Cacheable.java
@@ -17,27 +17,31 @@
 
 package org.apache.camel.resume;
 
-import org.apache.camel.Service;
+import org.apache.camel.resume.cache.ResumeCache;
 
 /**
- * Defines a strategy for handling resume operations. Implementations can 
define different ways to handle how to resume
- * processing records.
+ * Used to identify objects that can cache their resume state or data
  */
-public interface ResumeStrategy extends Service {
+public interface Cacheable {
 
     /**
-     * Gets an adapter for resuming operations
+     * Adds an offset key and value to the cache
+     * @param key the key to add
+     * @param offset the offset to add
+     * @return true if added successfully (i.e.: the cache is not full) or 
false otherwise
      */
-    ResumeAdapter getAdapter();
+    boolean add(OffsetKey<?> key, Offset<?> offset);
 
     /**
-     * Gets and adapter for resuming operations
-     * 
-     * @param  clazz the class of the adapter
-     * @return       the adapter or null if it can't be cast to the requested 
class
-     * @param  <T>   the type of the adapter
+     * Sets the cache in resume adapters and objects that cache their data
+     * @param cache A resume cache instance
      */
-    default <T extends ResumeAdapter> T getAdapter(Class<T> clazz) {
-        return clazz.cast(getAdapter());
-    }
+    void setCache(ResumeCache<?> cache);
+
+
+    /**
+     * Gets the cache in resume adapters and objects that cache their data
+     * @return A resume cache instance
+     */
+    ResumeCache<?> getCache();
 }
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/resume/Deserializable.java 
b/core/camel-api/src/main/java/org/apache/camel/resume/Deserializable.java
new file mode 100644
index 00000000000..630dc7bc2d6
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/Deserializable.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.resume;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+public interface Deserializable {
+
+     default Object deserializeObject(ByteBuffer buffer) {
+         buffer.clear();
+
+        int dataType = buffer.getInt();
+        switch (dataType) {
+            case Serializable.TYPE_INTEGER: {
+                return buffer.getInt();
+            }
+            case Serializable.TYPE_LONG: {
+                return buffer.getLong();
+            }
+            case Serializable.TYPE_STRING: {
+                byte[] tmp = new byte[1024];
+                buffer.get(tmp, 0, 1024);
+
+                return new String(tmp);
+            }
+            case Serializable.TYPE_FILE: {
+                int remaining = buffer.remaining();
+                byte[] tmp = new byte[remaining];
+                buffer.get(tmp);
+
+                return new File(new String(tmp));
+            }
+            default: {
+                return null;
+            }
+        }
+    }
+
+    boolean deserialize(ByteBuffer keyBuffer, ByteBuffer valueBuffer);
+}
diff --git a/core/camel-api/src/main/java/org/apache/camel/resume/Offset.java 
b/core/camel-api/src/main/java/org/apache/camel/resume/Offset.java
index c41f312d49a..a6e5519f296 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/Offset.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/Offset.java
@@ -17,12 +17,20 @@
 
 package org.apache.camel.resume;
 
+import java.nio.ByteBuffer;
+
 /**
  * Generic offset without a concrete type
  *
  * @param <T> the type of the offset
  */
-public interface Offset<T> {
+public interface Offset<T> extends Serializable {
+
+    /**
+     * Sets the current offset value
+     * @param offset the current offset value
+     */
+    void update(T offset);
 
     /**
      * Gets the offset value
@@ -31,4 +39,8 @@ public interface Offset<T> {
      */
     T offset();
 
+    @Override
+    default ByteBuffer serialize() {
+        return serialize(offset());
+    }
 }
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/resume/cache/SingleEntryCache.java
 b/core/camel-api/src/main/java/org/apache/camel/resume/OffsetKey.java
similarity index 63%
rename from 
core/camel-api/src/main/java/org/apache/camel/resume/cache/SingleEntryCache.java
rename to core/camel-api/src/main/java/org/apache/camel/resume/OffsetKey.java
index 2928dd8a26f..4299dbbdeef 100644
--- 
a/core/camel-api/src/main/java/org/apache/camel/resume/cache/SingleEntryCache.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/OffsetKey.java
@@ -15,24 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.camel.resume.cache;
+package org.apache.camel.resume;
 
-import java.util.Optional;
+import java.nio.ByteBuffer;
 
 /**
- * A resume cache where a single key can only be mapped to a single entry
- *
- * @param <K> the type the key
- * @param <V> the type of the entry
+ * An interface to represent offset keys (addressable for an offset)
+ * @param <K> the type of the offset key
  */
-public interface SingleEntryCache<K, V> extends ResumeCache<K, V> {
+public interface OffsetKey<K> extends Serializable {
+    /**
+     * Sets the key
+     * @param key the key valeu
+     */
+    void setKey(K key);
 
     /**
-     * Gets the offset value for the key
-     *
-     * @param  key the key
-     * @return     the key
+     * Gets the key
+     * @return the key instance
      */
-    Optional<V> get(K key);
+    K getKey();
 
+    @Override
+    default ByteBuffer serialize() {
+        return serialize(getKey());
+    }
 }
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/resume/Resumable.java 
b/core/camel-api/src/main/java/org/apache/camel/resume/Resumable.java
index 108af04293a..586616c960d 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/Resumable.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/Resumable.java
@@ -22,30 +22,21 @@ package org.apache.camel.resume;
  * For example, when reading large files, it may be possible to inform the 
last offset that was read, thus allowing
  * users of this interface to skip to that offset. This can potentially 
improve resumable operations by allowing
  * reprocessing of data.
- *
- * @param <Y> the type of the key, name or object that can be addressed by the 
given offset
- * @param <T> the type of the addressable value for the resumable object (for 
example, a file would use a Long value)
  */
-public interface Resumable<Y, T> {
+public interface Resumable {
 
     /**
-     * Updates the last offset as appropriate for the user of the interface
+     * Gets the offset key (i.e.: the addressable part of the resumable object)
      *
-     * @param offset the offset value
+     * @return An OffsetKey instance with the addressable part of the object. 
May return null or an EmptyOffset
+     * depending on the type of the resumable
      */
-    void updateLastOffset(T offset);
+    OffsetKey<?> getOffsetKey();
 
     /**
      * Gets the last offset value
      * 
      * @return the last offset value according to the interface and type 
implemented
      */
-    Offset<T> getLastOffset();
-
-    /**
-     * Gets the addressable part (key) of the resumable
-     * 
-     * @return the addressable part of the resumable
-     */
-    Y getAddressable();
+    Offset<?> getLastOffset();
 }
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/resume/ResumableSet.java 
b/core/camel-api/src/main/java/org/apache/camel/resume/ResumableSet.java
deleted file mode 100644
index d09a61f7e72..00000000000
--- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumableSet.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.resume;
-
-import java.lang.reflect.Array;
-import java.util.Arrays;
-import java.util.function.Predicate;
-
-/**
- * An interface that represents a set of resumables (i.e.: files in a 
directory, rows in a database, etc)
- *
- * @param <T> the indivudal type of each member of the set
- */
-public interface ResumableSet<T> {
-
-    /**
-     * Iterates over the set of input checking if they should be resumed or 
not.
-     *
-     * @param  input          the input array to check for resumables
-     * @param  resumableCheck a checker method that returns true if a single 
entry of the input should be resumed or
-     *                        false otherwise. For instance: given a set A, B 
and C, where B has already been processed,
-     *                        then a test for A and C returns true, whereas a 
test for B returns false.
-     * @return                a new array containing the elements that still 
need to be processed
-     */
-    default T[] resumeEach(T[] input, Predicate<T> resumableCheck) {
-        @SuppressWarnings("unchecked")
-        T[] tmp = (T[]) Array.newInstance(input.getClass().getComponentType(), 
input.length);
-        int count = 0;
-
-        for (T entry : input) {
-            if (resumableCheck.test(entry)) {
-                tmp[count] = entry;
-                count++;
-            }
-        }
-
-        if (count != input.length) {
-            return Arrays.copyOf(tmp, count);
-        }
-
-        return input;
-    }
-
-    /**
-     * Iterates over the set of input checking if they should be resumed or not
-     *
-     * @param resumableCheck a checker method that returns true if a single 
entry of the input should be resumed or
-     *                       false otherwise. For instance: given a set A, B 
and C, where B has already been processed,
-     *                       then a test for A and C returns true, whereas a 
test for B returns false.
-     */
-    void resumeEach(Predicate<T> resumableCheck);
-
-    /**
-     * Gets the input that should be resumed
-     *
-     * @return an array with the input that should be resumed
-     */
-    T[] resumed();
-
-    /**
-     * Whether there are resumable entries to process
-     *
-     * @return true if there are resumable entries or false otherwise
-     */
-    boolean hasResumables();
-}
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeAdapter.java 
b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeAdapter.java
index 1966c815c58..ae650304d3c 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeAdapter.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeAdapter.java
@@ -21,6 +21,9 @@ package org.apache.camel.resume;
  * A resume adapter provides the component-specific logic that plugs the more 
generic strategic with the lower level
  * requirements of the component being used.
  *
+ * The adapter class responsibility is to bind the component-specific part of 
the logic to the more generic handling of
+ * the resume strategy. The adapter is always component specific and some 
components may have more than one.
+ *
  * It is the responsibility of the supported components to implement the 
custom implementation for this part of the
  * resume API, as well as to offer component-specific interfaces that can be 
specialized by other integrations.
  */
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java 
b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
index 15e38fa0621..a1ffc6c67cc 100644
--- a/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/ResumeStrategy.java
@@ -25,6 +25,12 @@ import org.apache.camel.Service;
  */
 public interface ResumeStrategy extends Service {
 
+    /**
+     * Sets an adapter for resuming operations with this strategy
+     * @param adapter    the component-specific resume adapter
+     */
+    void setAdapter(ResumeAdapter adapter);
+
     /**
      * Gets an adapter for resuming operations
      */
@@ -40,4 +46,13 @@ public interface ResumeStrategy extends Service {
     default <T extends ResumeAdapter> T getAdapter(Class<T> clazz) {
         return clazz.cast(getAdapter());
     }
+
+
+    /**
+     * Loads the cache with the data currently available in this strategy
+     * @throws Exception
+     */
+    default void loadCache() throws Exception {
+
+    }
 }
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/resume/Serializable.java 
b/core/camel-api/src/main/java/org/apache/camel/resume/Serializable.java
new file mode 100644
index 00000000000..1cf134eee55
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/resume/Serializable.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.resume;
+
+
+import java.io.File;
+import java.nio.ByteBuffer;
+
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * An interface that represents resumable objects that can be serialized to a 
medium
+ */
+@FunctionalInterface
+public interface Serializable {
+    int TYPE_INTEGER = 0;
+    int TYPE_LONG = 1;
+    int TYPE_STRING = 2;
+    int TYPE_FILE = 3;
+
+    int BYTES = 1024;
+
+    /**
+     * Serializes this offset into a buffer of bytes
+     * @param obj the object to serialize
+     * @return a ByteBuffer instance with the serialized contents of this 
object
+     */
+    default ByteBuffer serialize(Object obj) {
+        ObjectHelper.notNull(obj, "Cannot perform serialization on a null 
object");
+
+        if (obj instanceof Long) {
+            ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES + 
Long.BYTES);
+            buffer.putInt(TYPE_LONG);
+
+            long data = ((Long) obj).longValue();
+            buffer.putLong(data);
+            return buffer;
+        }
+        if (obj instanceof String) {
+            byte[] data = ((String) obj).getBytes();
+
+            ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES + 
data.length);
+            buffer.putInt(TYPE_STRING);
+            buffer.put(data);
+
+            return buffer;
+        }
+        if (obj instanceof File) {
+
+            byte[] data = ((File) obj).getPath().getBytes();
+            ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES + 
data.length);
+
+            buffer.putInt(TYPE_FILE);
+            buffer.put(data);
+
+            return buffer;
+        }
+
+        return null;
+    }
+
+    /**
+     * Serializes this offset into a buffer of bytes
+     * @return a ByteBuffer instance with the serialized contents of this 
object
+     */
+    ByteBuffer serialize();
+}
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java
 
b/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java
index af6e682f198..52204789bc0 100644
--- 
a/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java
+++ 
b/core/camel-api/src/main/java/org/apache/camel/resume/UpdatableConsumerResumeStrategy.java
@@ -20,11 +20,9 @@ package org.apache.camel.resume;
 /**
  * An updatable resume strategy
  *
- * @param <K> the type of the key, name or object that can be addressed by the 
given offset
  * @param <T> the type of the addressable value for the resumable object (for 
example, a file would use a Long value)
- * @param <T> a resumable type capable of handling/storing/holding resumable 
information for the addressable and offset
  */
-public interface UpdatableConsumerResumeStrategy<K, V, T extends Resumable<K, 
V>> {
+public interface UpdatableConsumerResumeStrategy<T extends Resumable> {
 
     /**
      * Updates the last processed offset
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/resume/cache/MultiEntryCache.java
 
b/core/camel-api/src/main/java/org/apache/camel/resume/cache/MultiEntryCache.java
deleted file mode 100644
index 0708d1ddff0..00000000000
--- 
a/core/camel-api/src/main/java/org/apache/camel/resume/cache/MultiEntryCache.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.resume.cache;
-
-/**
- * A cache where an entry can point to one or more entries. For instance, a 
path as the key and the file entries as its
- * entries
- *
- * @param <K> the type of the key
- * @param <V> the type of the value
- */
-public interface MultiEntryCache<K, V> extends ResumeCache<K, V> {
-
-}
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java 
b/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
index c5fbedd1fa5..a6eb7737344 100644
--- 
a/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
+++ 
b/core/camel-api/src/main/java/org/apache/camel/resume/cache/ResumeCache.java
@@ -17,15 +17,34 @@
 
 package org.apache.camel.resume.cache;
 
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
 import org.apache.camel.resume.ResumeStrategy;
 
 /**
  * This cache stored the resumed data from a {@link ResumeStrategy}.
  * 
  * @param <K> the type of the key
- * @param <V> the type of the offset value
  */
-public interface ResumeCache<K, V> {
+public interface ResumeCache<K> {
+
+    /**
+     * If the specified key is not present, compute its value from the mapping 
function (like Java's standard Map one)
+     * @param key the key to get or associate with the value
+     * @param mapping the mapping function used to compute the value
+     * @return the value associated with the key (either the present or the 
one computed from the mapping function)
+     */
+    Object computeIfAbsent(K key, Function<? super K, ? super Object> mapping);
+
+    /**
+     * If the specified key is present, compute a new value from the mapping 
function (like Java's standard Map one)
+     * @param key the key to get or associate with the value
+     * @param remapping the remapping function used to compute the new value
+     * @return the value associated with the key (either the present or the 
one computed from the mapping function)
+     */
+    Object computeIfPresent(K key, BiFunction<? super K, ? super Object, ? 
super Object> remapping);
+
 
     /**
      * Whether the cache contains the key with the given entry value
@@ -34,7 +53,7 @@ public interface ResumeCache<K, V> {
      * @param  entry the entry
      * @return       true if the key/entry pair is stored in the cache
      */
-    boolean contains(K key, V entry);
+    boolean contains(K key, Object entry);
 
     /**
      * Adds a value to the cache
@@ -42,7 +61,7 @@ public interface ResumeCache<K, V> {
      * @param key         the key to add
      * @param offsetValue the offset value
      */
-    void add(K key, V offsetValue);
+    void add(K key, Object offsetValue);
 
     /**
      * Checks whether the cache is full
@@ -55,4 +74,23 @@ public interface ResumeCache<K, V> {
      * Gets the cache pool size
      */
     long capacity();
+
+
+    /**
+     * Gets the offset entry for the key
+     *
+     * @param  key the key
+     * @param clazz the class object representing the value to be obtained
+     * @return     the offset value wrapped in an optional
+     */
+    <T> T get(K key, Class<T> clazz);
+
+
+    /**
+     * Gets the offset entry for the key
+     *
+     * @param  key the key
+     * @return     the offset value
+     */
+    Object get(K key);
 }
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
index 3a9d4671088..f065ffcddc6 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultRoute.java
@@ -42,6 +42,7 @@ import org.apache.camel.Suspendable;
 import org.apache.camel.SuspendableService;
 import org.apache.camel.resume.ConsumerListener;
 import org.apache.camel.resume.ConsumerListenerAware;
+import org.apache.camel.resume.ResumeAdapter;
 import org.apache.camel.resume.ResumeAware;
 import org.apache.camel.resume.ResumeStrategy;
 import org.apache.camel.spi.IdAware;
@@ -54,6 +55,7 @@ import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.support.LoggerHelper;
 import org.apache.camel.support.PatternHelper;
+import org.apache.camel.support.resume.AdapterHelper;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
 import org.apache.camel.util.ObjectHelper;
@@ -641,7 +643,9 @@ public class DefaultRoute extends ServiceSupport implements 
Route {
                 ((RouteIdAware) consumer).setRouteId(this.getId());
             }
 
-            if (consumer instanceof ResumeAware) {
+            if (consumer instanceof ResumeAware && resumeStrategy != null) {
+                ResumeAdapter resumeAdapter = 
AdapterHelper.eval(getCamelContext(),consumer);
+                resumeStrategy.setAdapter(resumeAdapter);
                 ((ResumeAware) consumer).setResumeStrategy(resumeStrategy);
             }
 
diff --git 
a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
 
b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
index bcaee49fecf..fb3a637cca5 100644
--- 
a/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
+++ 
b/core/camel-core-engine/src/main/docs/modules/eips/pages/resume-strategies.adoc
@@ -34,7 +34,8 @@ This instance can be bound in the Context registry as follows:
 
 [source,java]
 ----
-getCamelContext().getRegistry().bind("testResumeStrategy", new 
MyTestResumeStrategy(new MyAdapter()));
+getCamelContext().getRegistry().bind("testResumeStrategy", new 
MyTestResumeStrategy());
+getCamelContext().getRegistry().bind("resumeCache", new 
MyChoiceOfResumeCache<>(100));
 
 from("some:component")
     .resumable("testResumeStrategy")
@@ -45,53 +46,20 @@ Or the instance can be constructed as follows:
 
 [source,java]
 ----
+getCamelContext().getRegistry().bind("resumeCache", new 
MyChoiceOfResumeCache<>(100));
+
 from("some:component")
-    .resumable(new MyTestResumeStrategy(new MyAdapter()))
+    .resumable(new MyTestResumeStrategy())
     .process(this::process)
 ----
 
-=== The Resume Adapter
-
-The adapter class responsibility is to bind the component-specific part of the 
logic to the more generic handling of the
-resume strategy. The adapter is always component specific and some components 
may have more than one. Integrations with
-more complex resume processes, may implement their own adapters, although the 
builtin ones should be useful in most of the
-cases. Currently, the following adapters are available:
-
-* camel-atom: `org.apache.camel.component.feed.EntryFilter`
-* camel-aws2-kinesis: 
`org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter`
-* camel-cassandracql: 
`org.apache.camel.component.cassandra.consumer.support.CassandraResumeAdapter`
-* camel-couchbase: 
`org.apache.camel.component.couchbase.CouchbaseResumeAdapter`
-* camel-couchdb: 
`org.apache.camel.component.couchdb.consumer.CouchDbResumeAdapter`
-* camel-file: 
`org.apache.camel.component.file.consumer.adapters.FileSetResumeAdapter` for 
directories
-* camel-file: 
`org.apache.camel.component.file.consumer.adapters.GenericFileResumeAdapter` 
for files
-* camel-kafka: 
`org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeAdapter`
-* camel-rss: `org.apache.camel.component.feed.EntryFilter`
-* generic: `org.apache.camel.processor.resume.DelegatingResumeAdapter`
-
-Note: in the future, these adapters will be resolved automatically by Camel.
-
-== The Resume API Interfaces
-
-These are the *core interfaces*:
-
-* `org.apache.camel.resume.ResumeStrategy` - the resume strategy service
-* `org.apache.camel.resume.ResumeAdapter` - an adapter that binds the generic 
parts of the resume strategy with the component
-* `org.apache.camel.resume.UpdatableConsumerResumeStrategy` - an extension to 
the resume strategy to allow updatable strategies
-* `org.apache.camel.resume.cache.ResumeCache` - the base interface for local 
cache for resumable information
-* `org.apache.camel.resume.cache.SingleEntryCache` - an interface for local 
cache for resumable information where there is a one-to-one relationship 
between cache a key and its entry (i.e: a file and its offset)
-* `org.apache.camel.resume.cache.MultiEntryCache` - an interface for local 
cache for resumable information where there is a one-to-many relationship 
between the cache a keys and its entries (i.e.: a path and its file entries)
-
-These are the *core classes* supporting the strategies:
-
-* `org.apache.camel.resume.Resumable` - an interface to allow users to work 
with abstract resumable entities (files, offsets, etc)
-* `org.apache.camel.resume.ResumableSet` - an interface for resumables with a 
1-to-many relationship
-* `org.apache.camel.resume.Offset` - a generic offset without a concrete type 
(it may represent a long, a file name, etc)
-
-These are the *supporting classes*:
+In some circumstances, such as when dealing with File I/O, it may be necessary 
to set the offset manually. There are
+*supporting classes* that can help work with resumables:
 
 * `org.apache.camel.support.Resumables` - resumables handling support
 * `org.apache.camel.support.Offsets` - offset handling support
 
+
 == Builtin Resume Strategies
 
 Camel comes with a few builtin strategies that can be used to store, retrieve 
and update the offsets. The following strategies are available:
@@ -108,8 +76,7 @@ New builtin resume strategies can be created by implementing 
the `UpdatableConsu
 
 A sample local cache implemented using 
https://github.com/ben-manes/caffeine[Caffeine].
 
-* `org.apache.camel.component.caffeine.resume.single.CaffeineCache`: for data 
with where 1 key can only point to 1 entry (1-to-1 relationship)
-* `org.apache.camel.component.caffeine.resume.multi.CaffeineCache`: for data 
with where 1 key can point to 1 or more entries (1-to-many relationship)
+* `org.apache.camel.component.caffeine.resume.CaffeineCache`
 
 == Known Limitations
 
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeAdapter.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeAdapter.java
deleted file mode 100644
index 6aee7fd8f4f..00000000000
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/DelegatingResumeAdapter.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.processor.resume;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Spliterator;
-import java.util.function.Consumer;
-import java.util.function.IntFunction;
-import java.util.function.Predicate;
-import java.util.function.UnaryOperator;
-import java.util.stream.Stream;
-
-import org.apache.camel.resume.ResumeAdapter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A delegating adapter that can be used to delegate to and/or abstract resume 
adapters
- */
-public class DelegatingResumeAdapter implements ResumeAdapter {
-    private static final Logger LOG = 
LoggerFactory.getLogger(DelegatingResumeAdapter.class);
-
-    private final List<ResumeAdapter> resumeStrategies;
-
-    public DelegatingResumeAdapter() {
-        resumeStrategies = new ArrayList<>();
-    }
-
-    protected DelegatingResumeAdapter(List<ResumeAdapter> resumeStrategies) {
-        this.resumeStrategies = resumeStrategies;
-    }
-
-    public boolean add(ResumeAdapter resumeAdapter) {
-        return resumeStrategies.add(resumeAdapter);
-    }
-
-    public boolean remove(Object resumeAdapter) {
-        return resumeStrategies.remove(resumeAdapter);
-    }
-
-    public boolean removeIf(Predicate<? super ResumeAdapter> filter) {
-        return resumeStrategies.removeIf(filter);
-    }
-
-    @Override
-    public void resume() {
-        resumeStrategies.forEach(ResumeAdapter::resume);
-    }
-
-    public int size() {
-        return resumeStrategies.size();
-    }
-
-    public boolean isEmpty() {
-        return resumeStrategies.isEmpty();
-    }
-
-    public boolean contains(Object o) {
-        return resumeStrategies.contains(o);
-    }
-
-    public Iterator<ResumeAdapter> iterator() {
-        return resumeStrategies.iterator();
-    }
-
-    public Object[] toArray() {
-        return resumeStrategies.toArray();
-    }
-
-    public <T> T[] toArray(T[] a) {
-        return resumeStrategies.toArray(a);
-    }
-
-    public boolean containsAll(Collection<?> c) {
-        return resumeStrategies.containsAll(c);
-    }
-
-    public boolean addAll(Collection<? extends ResumeAdapter> c) {
-        return resumeStrategies.addAll(c);
-    }
-
-    public boolean addAll(int index, Collection<? extends ResumeAdapter> c) {
-        return resumeStrategies.addAll(index, c);
-    }
-
-    public boolean removeAll(Collection<?> c) {
-        return resumeStrategies.removeAll(c);
-    }
-
-    public boolean retainAll(Collection<?> c) {
-        return resumeStrategies.retainAll(c);
-    }
-
-    public void replaceAll(UnaryOperator<ResumeAdapter> operator) {
-        resumeStrategies.replaceAll(operator);
-    }
-
-    public void sort(Comparator<? super ResumeAdapter> c) {
-        resumeStrategies.sort(c);
-    }
-
-    public void clear() {
-        resumeStrategies.clear();
-    }
-
-    public ResumeAdapter get(int index) {
-        return resumeStrategies.get(index);
-    }
-
-    public ResumeAdapter set(int index, ResumeAdapter element) {
-        return resumeStrategies.set(index, element);
-    }
-
-    public void add(int index, ResumeAdapter element) {
-        resumeStrategies.add(index, element);
-    }
-
-    public ResumeAdapter remove(int index) {
-        return resumeStrategies.remove(index);
-    }
-
-    public int indexOf(Object o) {
-        return resumeStrategies.indexOf(o);
-    }
-
-    public int lastIndexOf(Object o) {
-        return resumeStrategies.lastIndexOf(o);
-    }
-
-    public ListIterator<ResumeAdapter> listIterator() {
-        return resumeStrategies.listIterator();
-    }
-
-    public ListIterator<ResumeAdapter> listIterator(int index) {
-        return resumeStrategies.listIterator(index);
-    }
-
-    public List<ResumeAdapter> subList(int fromIndex, int toIndex) {
-        return resumeStrategies.subList(fromIndex, toIndex);
-    }
-
-    public Spliterator<ResumeAdapter> spliterator() {
-        return resumeStrategies.spliterator();
-    }
-
-    public <T> T[] toArray(IntFunction<T[]> generator) {
-        return resumeStrategies.toArray(generator);
-    }
-
-    public Stream<ResumeAdapter> stream() {
-        return resumeStrategies.stream();
-    }
-
-    public Stream<ResumeAdapter> parallelStream() {
-        return resumeStrategies.parallelStream();
-    }
-
-    public void forEach(Consumer<? super ResumeAdapter> action) {
-        resumeStrategies.forEach(action);
-    }
-}
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
index aa347da63e1..a2185ccfc65 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/ResumableCompletion.java
@@ -48,10 +48,10 @@ public class ResumableCompletion implements Synchronization 
{
         Object offset = 
ExchangeHelper.getResultMessage(exchange).getHeader(Exchange.OFFSET);
 
         if (offset instanceof Resumable) {
-            Resumable<?, ?> resumable = (Resumable<?, ?>) offset;
+            Resumable resumable = (Resumable) offset;
 
             if (LOG.isTraceEnabled()) {
-                LOG.trace("Processing the resumable: {}", 
resumable.getAddressable());
+                LOG.trace("Processing the resumable: {}", 
resumable.getOffsetKey());
                 LOG.trace("Processing the resumable of type: {}", 
resumable.getLastOffset().offset());
             }
 
@@ -75,14 +75,14 @@ public class ResumableCompletion implements Synchronization 
{
     @Override
     public void onFailure(Exchange exchange) {
         Exception e = exchange.getException();
-        Object offset = exchange.getMessage().getHeader(Exchange.OFFSET);
+        Object resObj = exchange.getMessage().getHeader(Exchange.OFFSET);
 
-        if (offset instanceof Resumable) {
-            Resumable<?, ?> resumable = (Resumable<?, ?>) offset;
+        if (resObj instanceof Resumable) {
+            Resumable resumable = (Resumable) resObj;
 
             String logMessage = String.format(
                     "Skipping offset update with address '%s' and offset value 
'%s' due to failure in processing: %s",
-                    resumable.getAddressable(), 
resumable.getLastOffset().offset(), e.getMessage());
+                    resumable.getOffsetKey(), 
resumable.getLastOffset().offset(), e.getMessage());
 
             if (LOG.isDebugEnabled() || CamelLogger.shouldLog(LOG, 
loggingLevel)) {
                 CamelLogger.log(LOG, LoggingLevel.DEBUG, logMessage, e);
@@ -93,7 +93,7 @@ public class ResumableCompletion implements Synchronization {
             }
         } else {
             String logMessage = String.format("Skipping offset update of '%s' 
due to failure in processing: %s",
-                    offset == null ? "type null" : "unspecified type", 
e.getMessage());
+                    resObj == null ? "type null" : "unspecified type", 
e.getMessage());
 
             if (LOG.isDebugEnabled() || CamelLogger.shouldLog(LOG, 
loggingLevel)) {
                 CamelLogger.log(LOG, LoggingLevel.DEBUG, logMessage, e);
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
index b0ffd26c080..205612b17e9 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/resume/TransientResumeStrategy.java
@@ -31,11 +31,21 @@ public class TransientResumeStrategy implements 
ResumeStrategy {
         this.resumeAdapter = resumeAdapter;
     }
 
+    @Override
+    public void setAdapter(ResumeAdapter adapter) {
+
+    }
+
     @Override
     public ResumeAdapter getAdapter() {
         return resumeAdapter;
     }
 
+    @Override
+    public <T extends ResumeAdapter> T getAdapter(Class<T> clazz) {
+        return ResumeStrategy.super.getAdapter(clazz);
+    }
+
     @Override
     public void start() {
         // this is NO-OP
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
index e3ac865298f..9ffe7489dc9 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
@@ -20,15 +20,16 @@ import java.io.File;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.file.consumer.GenericFileResumable;
-import org.apache.camel.component.file.consumer.GenericFileResumeAdapter;
+import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
+import org.apache.camel.component.file.consumer.FileOffsetResumeAdapter;
+import org.apache.camel.component.file.consumer.FileResumeAdapter;
+import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.resume.TransientResumeStrategy;
 import org.apache.camel.resume.Resumable;
@@ -37,37 +38,57 @@ import org.apache.camel.support.resume.Resumables;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class FileConsumerResumeFromOffsetStrategyTest extends 
ContextTestSupport {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileConsumerResumeFromOffsetStrategyTest.class);
+
+    private static class TestFileResumeAdapter implements FileResumeAdapter, 
FileOffsetResumeAdapter {
+        private GenericFile<File> resumable;
+        private DirectoryEntries fileSet;
 
-    private static class TestFileResumeAdapter implements 
GenericFileResumeAdapter {
         @Override
-        public void resume(GenericFileResumable<File> resumable) {
-            if 
(!resumable.getAddressable().getName().startsWith("resume-from-offset")) {
+        public void setResumePayload(GenericFile<File> resumable) {
+            if 
(!resumable.getFile().getName().startsWith("resume-from-offset")) {
                 throw new RuntimeCamelException("Invalid file - resume 
strategy should not have been called!");
             }
 
-            resumable.updateLastOffset(3L);
+            this.resumable = resumable;
         }
 
         @Override
         public void resume() {
-            throw new UnsupportedOperationException("Unsupported operation");
-            // NO-OP
-        }
+            if (resumable != null) {
+                resumable.updateLastOffset(3L);
+                resumable = null;
+            }
 
-        @Override
-        public Optional<Long> getLastOffset(File addressable) {
-            return Optional.empty();
+            if (fileSet != null) {
+                DirectoryEntries.doResume(fileSet, f -> 
!f.getName().equals("resume-from-offset"));
+                LOG.debug("Fileset: {}", fileSet);
+                LOG.debug("Fileset: {}", fileSet.resumed());
+
+                fileSet = null;
+            }
         }
     }
 
-    private static class FailResumeAdapter extends TestFileResumeAdapter
-            implements UpdatableConsumerResumeStrategy<File, Long, 
Resumable<File, Long>> {
+    private static class FailResumeAdapter implements FileResumeAdapter, 
DirectoryEntriesResumeAdapter,UpdatableConsumerResumeStrategy<Resumable> {
         private boolean called;
 
         @Override
-        public void updateLastOffset(Resumable<File, Long> offset) {
+        public void resume() {
+
+        }
+
+        @Override
+        public void setResumePayload(DirectoryEntries fileSet) {
+            DirectoryEntries.doResume(fileSet, f -> true);
+        }
+
+        @Override
+        public void updateLastOffset(Resumable offset) {
             called = true;
         }
     }
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
index aee27bb8fe7..5241e1571ec 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java
@@ -24,40 +24,46 @@ import java.util.Objects;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.file.consumer.FileResumeSet;
-import org.apache.camel.component.file.consumer.FileSetResumeAdapter;
+import org.apache.camel.component.file.consumer.DirectoryEntriesResumeAdapter;
+import org.apache.camel.component.file.consumer.FileResumeAdapter;
+import org.apache.camel.component.file.consumer.adapters.DirectoryEntries;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.resume.TransientResumeStrategy;
 import org.apache.camel.support.resume.Resumables;
+import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@DisplayName("Tests whether file consumer works with the resume strategy")
 public class FileConsumerResumeStrategyTest extends ContextTestSupport {
 
-    private static class TestFileSetResumeAdapter implements 
FileSetResumeAdapter {
-        private List<String> processedFiles = Arrays.asList("0.txt", "1.txt", 
"2.txt");
-        private FileResumeSet resumeSet;
+    private static class TestFileSetResumeAdapter implements 
FileResumeAdapter, DirectoryEntriesResumeAdapter {
+        private final List<String> processedFiles = Arrays.asList("0.txt", 
"1.txt", "2.txt");
+        private DirectoryEntries resumeSet;
 
         @Override
-        public void resume(FileResumeSet resumeSet) {
+        public void setResumePayload(DirectoryEntries resumeSet) {
             this.resumeSet = Objects.requireNonNull(resumeSet);
-
-            resume();
         }
 
         @Override
         public void resume() {
-            if (resumeSet != null) {
-                resumeSet.resumeEach(f -> 
!processedFiles.contains(f.getName()));
-            }
+            DirectoryEntries.doResume(resumeSet, f -> 
!processedFiles.contains(f.getName()));
         }
     }
 
+    private final TestFileSetResumeAdapter adapter = new 
TestFileSetResumeAdapter();
+
     private static Map<String, Object> headerFor(int num) {
         String name = num + ".txt";
 
         return Map.of(Exchange.FILE_NAME, name);
     }
 
+    @DisplayName("Tests whether it can resume processing of directory entries")
     @Test
     public void testResume() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
@@ -73,6 +79,10 @@ public class FileConsumerResumeStrategyTest extends 
ContextTestSupport {
 
         // only expect 4 of the 6 sent
         assertMockEndpointsSatisfied();
+
+        assertTrue(adapter.resumeSet.hasResumables(), "The resume set should 
have resumables in this scenario");
+        assertNotNull(adapter.resumeSet.resumed(), "The list of resumables 
should not be null");
+        assertEquals(4, adapter.resumeSet.resumed().length, "There should be 
exactly 4 resumables");
     }
 
     private void setOffset(Exchange exchange) {
@@ -85,12 +95,12 @@ public class FileConsumerResumeStrategyTest extends 
ContextTestSupport {
     }
 
     @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
+    protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
 
-                bindToRegistry("testResumeStrategy", new 
TransientResumeStrategy(new TestFileSetResumeAdapter()));
+                bindToRegistry("testResumeStrategy", new 
TransientResumeStrategy(adapter));
 
                 from(fileUri("resume?noop=true&recursive=true"))
                         .resumable("testResumeStrategy")
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/resume/AdapterHelper.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/resume/AdapterHelper.java
new file mode 100644
index 00000000000..86b066a4667
--- /dev/null
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/resume/AdapterHelper.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.support.resume;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Properties;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Consumer;
+import org.apache.camel.resume.Cacheable;
+import org.apache.camel.resume.ResumeAdapter;
+import org.apache.camel.resume.cache.ResumeCache;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class AdapterHelper {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AdapterHelper.class);
+    private static final String ADAPTER_PROPERTIES = 
"/org/apache/camel/resume/adapter.properties";
+    private static final String PROP_ADAPTER_CLASS = "adapterClass";
+
+    private AdapterHelper() {
+    }
+
+    public static ResumeAdapter eval(CamelContext context, Consumer consumer) {
+        assert context != null;
+        assert consumer != null;
+
+        Object adapterInstance = 
context.getRegistry().lookupByName("resumeAdapter");
+        if (adapterInstance == null) {
+            adapterInstance = resolveAdapter(context, consumer);
+
+            if (adapterInstance == null) {
+                throw new RuntimeException("Cannot find a resume adapter class 
in the consumer classpath or in the registry");
+            }
+        }
+
+        if (adapterInstance instanceof ResumeAdapter) {
+            ResumeAdapter resumeAdapter = (ResumeAdapter) adapterInstance;
+
+            Object obj = context.getRegistry().lookupByName("resumeCache");
+            if (resumeAdapter instanceof Cacheable && obj instanceof 
ResumeCache) {
+                ((Cacheable) resumeAdapter).setCache((ResumeCache<?>) obj);
+            } else {
+                LOG.debug("The resume adapter {} is not cacheable", 
resumeAdapter.getClass().getName());
+            }
+
+            return resumeAdapter;
+        } else {
+            LOG.error("Invalid resume adapter type: {}", 
getType(adapterInstance));
+            throw new IllegalArgumentException("Invalid resume adapter type: " 
+ getType(adapterInstance));
+        }
+    }
+
+    private static Object resolveAdapter(CamelContext context, Consumer 
consumer) {
+        try (InputStream adapterStream = 
consumer.getClass().getResourceAsStream(ADAPTER_PROPERTIES)) {
+
+            if (adapterStream == null) {
+                LOG.error("Cannot find a resume adapter class in the consumer 
{} classpath", consumer.getClass());
+                return null;
+            }
+
+            Properties properties = new Properties();
+            properties.load(adapterStream);
+
+            String adapterClass = properties.getProperty(PROP_ADAPTER_CLASS);
+
+            if (ObjectHelper.isEmpty(adapterClass)) {
+                LOG.error("A resume adapter class is not defined in the 
adapter configuration");
+
+                return null;
+            }
+
+            LOG.debug("About to load an adapter class {} for consumer {}", 
adapterClass, consumer.getClass());
+            Class<?> clazz = 
context.getClassResolver().resolveClass(adapterClass);
+            if (clazz == null) {
+                LOG.error("Cannot find the resume adapter class in the 
classpath {}", adapterClass);
+
+                return null;
+            }
+
+            return clazz.getDeclaredConstructor().newInstance();
+        } catch (IOException e) {
+            LOG.error("Unable to read the resolve the resume adapter due to 
I/O error: {}", e.getMessage(), e);
+        } catch (InvocationTargetException | InstantiationException | 
IllegalAccessException | NoSuchMethodException e) {
+            LOG.error("Unable to create a resume adapter instance: {}", 
e.getMessage(), e);
+        }
+
+        return null;
+    }
+
+    private static Object getType(Object instance) {
+        return instance == null ? "null" : instance.getClass();
+    }
+}
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/resume/OffsetKeys.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/resume/OffsetKeys.java
new file mode 100644
index 00000000000..4e3e640396d
--- /dev/null
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/resume/OffsetKeys.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.support.resume;
+
+import org.apache.camel.resume.OffsetKey;
+
+/**
+ * Utility class for handling offset keys
+ */
+public final class OffsetKeys {
+    /**
+     * For creating anonymous offset keys
+     * 
+     * @param <T> the type of the offset key
+     */
+    private static class AnonymousOffsetKey<T> implements OffsetKey<T> {
+        private T key;
+
+        public AnonymousOffsetKey() {
+        }
+
+        public AnonymousOffsetKey(T key) {
+            this.key = key;
+        }
+
+        @Override
+        public void setKey(T key) {
+            this.key = key;
+        }
+
+        @Override
+        public T getKey() {
+            return key;
+        }
+    }
+
+    /**
+     * For creating unmodifiable offset keys
+     * 
+     * @param <T> the type of the offset key
+     */
+    private static class UnmodifiableOffsetKey<T> implements OffsetKey<T> {
+        private final T key;
+
+        public UnmodifiableOffsetKey(T key) {
+            this.key = key;
+        }
+
+        @Override
+        public void setKey(T key) {
+            throw new UnsupportedOperationException("This object is 
unmodifiable");
+        }
+
+        @Override
+        public T getKey() {
+            return key;
+        }
+    }
+
+    private OffsetKeys() {
+    }
+
+    /**
+     * Creates a new offset key wrapping the given object
+     * 
+     * @param  object the object to wrap in the offset key
+     * @return        a new OffsetKey object that wraps the given object
+     * @param  <T>    the type of the object being wrapped
+     */
+    public static <T> OffsetKey<T> of(T object) {
+        return new AnonymousOffsetKey<>(object);
+    }
+
+    /**
+     * Creates a new unmodifiable offset key wrapping the given object
+     * 
+     * @param  object the object to wrap in the offset key
+     * @return        a new OffsetKey object that wraps the given object. The 
offset key of this object cannot be
+     *                updated.
+     * @param  <T>    the type of the object being wrapped
+     */
+    public static <T> OffsetKey<T> unmodifiableOf(T object) {
+        return new UnmodifiableOffsetKey<>(object);
+    }
+
+    /**
+     * Creates new empty OffsetKey object
+     * 
+     * @return an empty OffsetKey object
+     */
+    public static OffsetKey<?> empty() {
+        return new AnonymousOffsetKey<>();
+    }
+}
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/resume/Offsets.java 
b/core/camel-support/src/main/java/org/apache/camel/support/resume/Offsets.java
index cc2b0195ac5..46403d4c66a 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/resume/Offsets.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/resume/Offsets.java
@@ -23,11 +23,23 @@ import org.apache.camel.resume.Offset;
  * Offset handling support
  */
 public final class Offsets {
+    private static class AnonymousOffset<T> implements Offset<T> {
+        private T offset;
 
-    /**
-     * Default initial offset when using long offsets
-     */
-    public static final Offset<Long> INITIAL_LONG = Offsets.of(0L);
+        public AnonymousOffset(T offset) {
+            this.offset = offset;
+        }
+
+        @Override
+        public void update(T offset) {
+            this.offset = offset;
+        }
+
+        @Override
+        public T offset() {
+            return offset;
+        }
+    }
 
     private Offsets() {
     }
@@ -40,7 +52,7 @@ public final class Offsets {
      * @return             A new Offset holder with the given offset value
      */
     public static <T> Offset<T> of(T offsetValue) {
-        return () -> offsetValue;
+        return new AnonymousOffset<>(offsetValue);
     }
 
     /**
@@ -54,7 +66,7 @@ public final class Offsets {
      */
     public static <T> Offset<T> ofNullable(T offsetValue, T defaultValue) {
         if (offsetValue != null) {
-            return () -> offsetValue;
+            return new AnonymousOffset<>(offsetValue);
         }
 
         return Offsets.of(defaultValue);
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/resume/Resumables.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/resume/Resumables.java
index 56698545bc6..7478735d739 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/resume/Resumables.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/resume/Resumables.java
@@ -17,7 +17,12 @@
 
 package org.apache.camel.support.resume;
 
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.function.Predicate;
+
 import org.apache.camel.resume.Offset;
+import org.apache.camel.resume.OffsetKey;
 import org.apache.camel.resume.Resumable;
 
 /**
@@ -31,7 +36,7 @@ public final class Resumables {
      * @param <K> the type of the key, name or object that can be addressed by 
the given offset (aka addressable)
      * @param <V> the type of offset
      */
-    private static class AnonymousResumable<K, V> implements Resumable<K, V> {
+    private static class AnonymousResumable<K, V> implements Resumable {
         private final K addressable;
         private V offset;
 
@@ -55,19 +60,26 @@ public final class Resumables {
             this.offset = offset;
         }
 
-        @Override
-        public void updateLastOffset(V offset) {
-            this.offset = offset;
-        }
-
         @Override
         public Offset<V> getLastOffset() {
             return Offsets.of(offset);
         }
 
         @Override
-        public K getAddressable() {
-            return addressable;
+        public OffsetKey<K> getOffsetKey() {
+            return new OffsetKey<>() {
+                private final K key = addressable;
+
+                @Override
+                public void setKey(K key) {
+                    throw new UnsupportedOperationException("Setting offset 
keys for anonymous resumables is unsupported");
+                }
+
+                @Override
+                public K getKey() {
+                    return key;
+                }
+            };
         }
     }
 
@@ -85,7 +97,36 @@ public final class Resumables {
      * @param  <V>         the type of offset
      * @return             A new resumable entity for the given addressable 
with the given offset value
      */
-    public static <K, V> Resumable<K, V> of(K addressable, V offset) {
+    public static <K, V> Resumable of(K addressable, V offset) {
         return new AnonymousResumable<>(addressable, offset);
     }
+
+    /**
+     * Iterates over the set of input checking if they should be resumed or 
not.
+     *
+     * @param  input          the input array to check for resumables
+     * @param  resumableCheck a checker method that returns true if a single 
entry of the input should be resumed or
+     *                        false otherwise. For instance: given a set A, B 
and C, where B has already been processed,
+     *                        then a test for A and C returns true, whereas a 
test for B returns false.
+     * @return                a new array containing the elements that still 
need to be processed
+     */
+    public static <T> T[] resumeEach(T[] input, Predicate<T> resumableCheck) {
+        @SuppressWarnings("unchecked")
+        T[] tmp = (T[]) Array.newInstance(input.getClass().getComponentType(), 
input.length);
+        int count = 0;
+
+        for (T entry : input) {
+            if (resumableCheck.test(entry)) {
+                tmp[count] = entry;
+                count++;
+            }
+        }
+
+        if (count != input.length) {
+            return Arrays.copyOf(tmp, count);
+        }
+
+        return input;
+    }
+
 }

Reply via email to