This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new ea6885d  CAMEL-17186: added support for resume strategies for AWS2 
Kinesis
ea6885d is described below

commit ea6885d1c997a6da761d672fcc8dc285b6e551c9
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Wed Nov 10 13:30:54 2021 +0100

    CAMEL-17186: added support for resume strategies for AWS2 Kinesis
---
 .../aws2/kinesis/Kinesis2ComponentConfigurer.java  |  6 +++
 .../aws2/kinesis/Kinesis2EndpointConfigurer.java   |  6 +++
 .../aws2/kinesis/Kinesis2EndpointUriFactory.java   |  3 +-
 .../camel/component/aws2/kinesis/aws2-kinesis.json |  2 +
 .../aws2/kinesis/Kinesis2Configuration.java        | 14 +++++++
 .../component/aws2/kinesis/Kinesis2Consumer.java   | 23 +++++++-----
 .../kinesis/consumer/KinesisResumeStrategy.java    | 25 +++++++++++++
 .../KinesisUserConfigurationResumeStrategy.java    | 43 ++++++++++++++++++++++
 .../dsl/Aws2KinesisComponentBuilderFactory.java    | 19 ++++++++++
 .../dsl/Kinesis2EndpointBuilderFactory.java        | 36 ++++++++++++++++++
 10 files changed, 167 insertions(+), 10 deletions(-)

diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2ComponentConfigurer.java
 
b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2ComponentConfigurer.java
index 1a15fdc..97bb77a 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2ComponentConfigurer.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2ComponentConfigurer.java
@@ -54,6 +54,8 @@ public class Kinesis2ComponentConfigurer extends 
PropertyConfigurerSupport imple
         case "proxyprotocol":
         case "proxyProtocol": 
getOrCreateConfiguration(target).setProxyProtocol(property(camelContext, 
software.amazon.awssdk.core.Protocol.class, value)); return true;
         case "region": 
getOrCreateConfiguration(target).setRegion(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "resumestrategy":
+        case "resumeStrategy": 
getOrCreateConfiguration(target).setResumeStrategy(property(camelContext, 
org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy.class, 
value)); return true;
         case "secretkey":
         case "secretKey": 
getOrCreateConfiguration(target).setSecretKey(property(camelContext, 
java.lang.String.class, value)); return true;
         case "sequencenumber":
@@ -106,6 +108,8 @@ public class Kinesis2ComponentConfigurer extends 
PropertyConfigurerSupport imple
         case "proxyprotocol":
         case "proxyProtocol": return 
software.amazon.awssdk.core.Protocol.class;
         case "region": return java.lang.String.class;
+        case "resumestrategy":
+        case "resumeStrategy": return 
org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy.class;
         case "secretkey":
         case "secretKey": return java.lang.String.class;
         case "sequencenumber":
@@ -154,6 +158,8 @@ public class Kinesis2ComponentConfigurer extends 
PropertyConfigurerSupport imple
         case "proxyprotocol":
         case "proxyProtocol": return 
getOrCreateConfiguration(target).getProxyProtocol();
         case "region": return getOrCreateConfiguration(target).getRegion();
+        case "resumestrategy":
+        case "resumeStrategy": return 
getOrCreateConfiguration(target).getResumeStrategy();
         case "secretkey":
         case "secretKey": return 
getOrCreateConfiguration(target).getSecretKey();
         case "sequencenumber":
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointConfigurer.java
 
b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointConfigurer.java
index b165e71..00e68d0 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointConfigurer.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointConfigurer.java
@@ -62,6 +62,8 @@ public class Kinesis2EndpointConfigurer extends 
PropertyConfigurerSupport implem
         case "region": 
target.getConfiguration().setRegion(property(camelContext, 
java.lang.String.class, value)); return true;
         case "repeatcount":
         case "repeatCount": target.setRepeatCount(property(camelContext, 
long.class, value)); return true;
+        case "resumestrategy":
+        case "resumeStrategy": 
target.getConfiguration().setResumeStrategy(property(camelContext, 
org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy.class, 
value)); return true;
         case "runlogginglevel":
         case "runLoggingLevel": 
target.setRunLoggingLevel(property(camelContext, 
org.apache.camel.LoggingLevel.class, value)); return true;
         case "scheduledexecutorservice":
@@ -144,6 +146,8 @@ public class Kinesis2EndpointConfigurer extends 
PropertyConfigurerSupport implem
         case "region": return java.lang.String.class;
         case "repeatcount":
         case "repeatCount": return long.class;
+        case "resumestrategy":
+        case "resumeStrategy": return 
org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy.class;
         case "runlogginglevel":
         case "runLoggingLevel": return org.apache.camel.LoggingLevel.class;
         case "scheduledexecutorservice":
@@ -222,6 +226,8 @@ public class Kinesis2EndpointConfigurer extends 
PropertyConfigurerSupport implem
         case "region": return target.getConfiguration().getRegion();
         case "repeatcount":
         case "repeatCount": return target.getRepeatCount();
+        case "resumestrategy":
+        case "resumeStrategy": return 
target.getConfiguration().getResumeStrategy();
         case "runlogginglevel":
         case "runLoggingLevel": return target.getRunLoggingLevel();
         case "scheduledexecutorservice":
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointUriFactory.java
 
b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointUriFactory.java
index 1ba2211..5dafce0 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointUriFactory.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointUriFactory.java
@@ -20,7 +20,7 @@ public class Kinesis2EndpointUriFactory extends 
org.apache.camel.support.compone
     private static final Set<String> PROPERTY_NAMES;
     private static final Set<String> SECRET_PROPERTY_NAMES;
     static {
-        Set<String> props = new HashSet<>(38);
+        Set<String> props = new HashSet<>(39);
         props.add("backoffMultiplier");
         props.add("initialDelay");
         props.add("scheduler");
@@ -56,6 +56,7 @@ public class Kinesis2EndpointUriFactory extends 
org.apache.camel.support.compone
         props.add("startScheduler");
         props.add("accessKey");
         props.add("overrideEndpoint");
+        props.add("resumeStrategy");
         props.add("maxResultsPerRequest");
         props.add("region");
         props.add("exceptionHandler");
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
 
b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
index bc7981b..51a2f4c 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json
@@ -36,6 +36,7 @@
     "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error 
Handler", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Allows for bridging the 
consumer to the Camel routing Error Handler, which mean any exceptions occurred 
while the consumer is trying to pickup incoming messages, or the likes, will 
now be processed as a me [...]
     "iteratorType": { "kind": "property", "displayName": "Iterator Type", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "software.amazon.awssdk.services.kinesis.model.ShardIteratorType", 
"enum": [ "AT_SEQUENCE_NUMBER", "AFTER_SEQUENCE_NUMBER", "TRIM_HORIZON", 
"LATEST", "AT_TIMESTAMP", "null" ], "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "TRIM_HORIZON", "configurationClass": 
"org.apache.camel.component.aws2.kinesi [...]
     "maxResultsPerRequest": { "kind": "property", "displayName": "Max Results 
Per Request", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 1, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "Maximum number of 
records that will be fetched in each poll" },
+    "resumeStrategy": { "kind": "property", "displayName": "Resume Strategy", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": 
"org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
"KinesisUserConfigurationResumeStrategy", "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "descri [...]
     "sequenceNumber": { "kind": "property", "displayName": "Sequence Number", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "The sequence number to 
start polling from. Required if iteratorType is set to AFTER_SEQUENCE_NUMBER or 
 [...]
     "shardClosed": { "kind": "property", "displayName": "Shard Closed", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": 
"org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum", 
"enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "ignore", "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "des [...]
     "shardId": { "kind": "property", "displayName": "Shard Id", "group": 
"consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "Defines which shardId in 
the Kinesis stream to get records from" },
@@ -59,6 +60,7 @@
     "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error 
Handler", "group": "consumer", "label": "consumer", "required": false, "type": 
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": false, "description": "Allows for bridging the 
consumer to the Camel routing Error Handler, which mean any exceptions occurred 
while the consumer is trying to pickup incoming messages, or the likes, will 
now be processed as a m [...]
     "iteratorType": { "kind": "parameter", "displayName": "Iterator Type", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": "software.amazon.awssdk.services.kinesis.model.ShardIteratorType", 
"enum": [ "AT_SEQUENCE_NUMBER", "AFTER_SEQUENCE_NUMBER", "TRIM_HORIZON", 
"LATEST", "AT_TIMESTAMP", "null" ], "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": "TRIM_HORIZON", "configurationClass": 
"org.apache.camel.component.aws2.kines [...]
     "maxResultsPerRequest": { "kind": "parameter", "displayName": "Max Results 
Per Request", "group": "consumer", "label": "consumer", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "autowired": false, 
"secret": false, "defaultValue": 1, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "Maximum number of 
records that will be fetched in each poll" },
+    "resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": 
"org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 
"KinesisUserConfigurationResumeStrategy", "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "descr [...]
     "sendEmptyMessageWhenIdle": { "kind": "parameter", "displayName": "Send 
Empty Message When Idle", "group": "consumer", "label": "consumer", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"autowired": false, "secret": false, "defaultValue": false, "description": "If 
the polling consumer did not poll any files, you can enable this option to send 
an empty message (no body) instead." },
     "sequenceNumber": { "kind": "parameter", "displayName": "Sequence Number", 
"group": "consumer", "label": "consumer", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": false, "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "description": "The sequence number to 
start polling from. Required if iteratorType is set to AFTER_SEQUENCE_NUMBER or 
[...]
     "shardClosed": { "kind": "parameter", "displayName": "Shard Closed", 
"group": "consumer", "label": "consumer", "required": false, "type": "object", 
"javaType": 
"org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum", 
"enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired": 
false, "secret": false, "defaultValue": "ignore", "configurationClass": 
"org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", 
"configurationField": "configuration", "de [...]
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
index db57da4..0ded74b 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.aws2.kinesis;
 
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
@@ -83,6 +84,11 @@ public class Kinesis2Configuration implements Cloneable {
                             "static credentials to be passed in.")
     private boolean useDefaultCredentialsProvider;
 
+    @UriParam(label = "consumer",
+              description = "Defines a resume strategy for AWS Kinesis. The 
default strategy reads the sequenceNumber if provided",
+              defaultValue = "KinesisUserConfigurationResumeStrategy")
+    private KinesisResumeStrategy resumeStrategy;
+
     public KinesisClient getAmazonKinesisClient() {
         return amazonKinesisClient;
     }
@@ -227,6 +233,14 @@ public class Kinesis2Configuration implements Cloneable {
         this.useDefaultCredentialsProvider = useDefaultCredentialsProvider;
     }
 
+    public KinesisResumeStrategy getResumeStrategy() {
+        return resumeStrategy;
+    }
+
+    public void setResumeStrategy(KinesisResumeStrategy resumeStrategy) {
+        this.resumeStrategy = resumeStrategy;
+    }
+
     // *************************************************
     //
     // *************************************************
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 3446612..44cec89 100644
--- 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -23,6 +23,8 @@ import java.util.Queue;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy;
+import 
org.apache.camel.component.aws2.kinesis.consumer.KinesisUserConfigurationResumeStrategy;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
@@ -37,7 +39,6 @@ import 
software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
 import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.Shard;
-import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 public class Kinesis2Consumer extends ScheduledBatchPollingConsumer {
 
@@ -160,9 +161,7 @@ public class Kinesis2Consumer extends 
ScheduledBatchPollingConsumer {
                     
.streamName(getEndpoint().getConfiguration().getStreamName()).shardId(shardId)
                     
.shardIteratorType(getEndpoint().getConfiguration().getIteratorType());
 
-            if (hasSequenceNumber()) {
-                
req.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber());
-            }
+            resume(req);
 
             GetShardIteratorResponse result = 
getClient().getShardIterator(req.build());
             currentShardIterator = result.shardIterator();
@@ -171,6 +170,17 @@ public class Kinesis2Consumer extends 
ScheduledBatchPollingConsumer {
         return currentShardIterator;
     }
 
+    private void resume(GetShardIteratorRequest.Builder req) {
+        KinesisResumeStrategy resumeStrategy;
+        if (getEndpoint().getConfiguration().getResumeStrategy() == null) {
+            resumeStrategy = new 
KinesisUserConfigurationResumeStrategy(getEndpoint().getConfiguration());
+        } else {
+            resumeStrategy = 
getEndpoint().getConfiguration().getResumeStrategy();
+        }
+
+        resumeStrategy.resume(req);
+    }
+
     private Queue<Exchange> createExchanges(List<Record> records) {
         Queue<Exchange> exchanges = new ArrayDeque<>();
         for (Record record : records) {
@@ -192,9 +202,4 @@ public class Kinesis2Consumer extends 
ScheduledBatchPollingConsumer {
         return exchange;
     }
 
-    private boolean hasSequenceNumber() {
-        return !getEndpoint().getConfiguration().getSequenceNumber().isEmpty()
-                && 
(getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
-                        || 
getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
-    }
 }
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java
new file mode 100644
index 0000000..7e0e5bc
--- /dev/null
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.aws2.kinesis.consumer;
+
+import org.apache.camel.ResumeStrategy;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+
+public interface KinesisResumeStrategy extends 
ResumeStrategy<GetShardIteratorRequest.Builder> {
+
+}
diff --git 
a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java
 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java
new file mode 100644
index 0000000..4d78990
--- /dev/null
+++ 
b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.aws2.kinesis.consumer;
+
+import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
+public class KinesisUserConfigurationResumeStrategy implements 
KinesisResumeStrategy {
+    private final Kinesis2Configuration configuration;
+
+    public KinesisUserConfigurationResumeStrategy(Kinesis2Configuration 
configuration) {
+        this.configuration = configuration;
+    }
+
+    private boolean hasSequenceNumber() {
+        return !configuration.getSequenceNumber().isEmpty()
+                && 
(configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
+                        || 
configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
+    }
+
+    @Override
+    public void resume(GetShardIteratorRequest.Builder resumable) {
+        if (hasSequenceNumber()) {
+            
resumable.startingSequenceNumber(configuration.getSequenceNumber());
+        }
+    }
+}
diff --git 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java
 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java
index c6ac5e1..061c1c0 100644
--- 
a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java
+++ 
b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java
@@ -286,6 +286,24 @@ public interface Aws2KinesisComponentBuilderFactory {
             return this;
         }
         /**
+         * Defines a resume strategy for AWS Kinesis. The default strategy 
reads
+         * the sequenceNumber if provided.
+         * 
+         * The option is a:
+         * 
&lt;code&gt;org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy&lt;/code&gt;
 type.
+         * 
+         * Default: KinesisUserConfigurationResumeStrategy
+         * Group: consumer
+         * 
+         * @param resumeStrategy the value to set
+         * @return the dsl builder
+         */
+        default Aws2KinesisComponentBuilder resumeStrategy(
+                
org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy 
resumeStrategy) {
+            doSetProperty("resumeStrategy", resumeStrategy);
+            return this;
+        }
+        /**
          * The sequence number to start polling from. Required if iteratorType
          * is set to AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER.
          * 
@@ -448,6 +466,7 @@ public interface Aws2KinesisComponentBuilderFactory {
             case "bridgeErrorHandler": ((Kinesis2Component) 
component).setBridgeErrorHandler((boolean) value); return true;
             case "iteratorType": getOrCreateConfiguration((Kinesis2Component) 
component).setIteratorType((software.amazon.awssdk.services.kinesis.model.ShardIteratorType)
 value); return true;
             case "maxResultsPerRequest": 
getOrCreateConfiguration((Kinesis2Component) 
component).setMaxResultsPerRequest((int) value); return true;
+            case "resumeStrategy": 
getOrCreateConfiguration((Kinesis2Component) 
component).setResumeStrategy((org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy)
 value); return true;
             case "sequenceNumber": 
getOrCreateConfiguration((Kinesis2Component) 
component).setSequenceNumber((java.lang.String) value); return true;
             case "shardClosed": getOrCreateConfiguration((Kinesis2Component) 
component).setShardClosed((org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum)
 value); return true;
             case "shardId": getOrCreateConfiguration((Kinesis2Component) 
component).setShardId((java.lang.String) value); return true;
diff --git 
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
 
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
index 958879b8..77f6817 100644
--- 
a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
+++ 
b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java
@@ -438,6 +438,42 @@ public interface Kinesis2EndpointBuilderFactory {
             return this;
         }
         /**
+         * Defines a resume strategy for AWS Kinesis. The default strategy 
reads
+         * the sequenceNumber if provided.
+         * 
+         * The option is a:
+         * 
&lt;code&gt;org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy&lt;/code&gt;
 type.
+         * 
+         * Default: KinesisUserConfigurationResumeStrategy
+         * Group: consumer
+         * 
+         * @param resumeStrategy the value to set
+         * @return the dsl builder
+         */
+        default Kinesis2EndpointConsumerBuilder resumeStrategy(
+                Object resumeStrategy) {
+            doSetProperty("resumeStrategy", resumeStrategy);
+            return this;
+        }
+        /**
+         * Defines a resume strategy for AWS Kinesis. The default strategy 
reads
+         * the sequenceNumber if provided.
+         * 
+         * The option will be converted to a
+         * 
&lt;code&gt;org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy&lt;/code&gt;
 type.
+         * 
+         * Default: KinesisUserConfigurationResumeStrategy
+         * Group: consumer
+         * 
+         * @param resumeStrategy the value to set
+         * @return the dsl builder
+         */
+        default Kinesis2EndpointConsumerBuilder resumeStrategy(
+                String resumeStrategy) {
+            doSetProperty("resumeStrategy", resumeStrategy);
+            return this;
+        }
+        /**
          * If the polling consumer did not poll any files, you can enable this
          * option to send an empty message (no body) instead.
          * 

Reply via email to