This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new ea6885d CAMEL-17186: added support for resume strategies for AWS2 Kinesis ea6885d is described below commit ea6885d1c997a6da761d672fcc8dc285b6e551c9 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Wed Nov 10 13:30:54 2021 +0100 CAMEL-17186: added support for resume strategies for AWS2 Kinesis --- .../aws2/kinesis/Kinesis2ComponentConfigurer.java | 6 +++ .../aws2/kinesis/Kinesis2EndpointConfigurer.java | 6 +++ .../aws2/kinesis/Kinesis2EndpointUriFactory.java | 3 +- .../camel/component/aws2/kinesis/aws2-kinesis.json | 2 + .../aws2/kinesis/Kinesis2Configuration.java | 14 +++++++ .../component/aws2/kinesis/Kinesis2Consumer.java | 23 +++++++----- .../kinesis/consumer/KinesisResumeStrategy.java | 25 +++++++++++++ .../KinesisUserConfigurationResumeStrategy.java | 43 ++++++++++++++++++++++ .../dsl/Aws2KinesisComponentBuilderFactory.java | 19 ++++++++++ .../dsl/Kinesis2EndpointBuilderFactory.java | 36 ++++++++++++++++++ 10 files changed, 167 insertions(+), 10 deletions(-) diff --git a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2ComponentConfigurer.java b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2ComponentConfigurer.java index 1a15fdc..97bb77a 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2ComponentConfigurer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2ComponentConfigurer.java @@ -54,6 +54,8 @@ public class Kinesis2ComponentConfigurer extends PropertyConfigurerSupport imple case "proxyprotocol": case "proxyProtocol": getOrCreateConfiguration(target).setProxyProtocol(property(camelContext, software.amazon.awssdk.core.Protocol.class, value)); return true; case "region": getOrCreateConfiguration(target).setRegion(property(camelContext, java.lang.String.class, value)); return true; + case "resumestrategy": + case "resumeStrategy": getOrCreateConfiguration(target).setResumeStrategy(property(camelContext, org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy.class, value)); return true; case "secretkey": case "secretKey": getOrCreateConfiguration(target).setSecretKey(property(camelContext, java.lang.String.class, value)); return true; case "sequencenumber": @@ -106,6 +108,8 @@ public class Kinesis2ComponentConfigurer extends PropertyConfigurerSupport imple case "proxyprotocol": case "proxyProtocol": return software.amazon.awssdk.core.Protocol.class; case "region": return java.lang.String.class; + case "resumestrategy": + case "resumeStrategy": return org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy.class; case "secretkey": case "secretKey": return java.lang.String.class; case "sequencenumber": @@ -154,6 +158,8 @@ public class Kinesis2ComponentConfigurer extends PropertyConfigurerSupport imple case "proxyprotocol": case "proxyProtocol": return getOrCreateConfiguration(target).getProxyProtocol(); case "region": return getOrCreateConfiguration(target).getRegion(); + case "resumestrategy": + case "resumeStrategy": return getOrCreateConfiguration(target).getResumeStrategy(); case "secretkey": case "secretKey": return getOrCreateConfiguration(target).getSecretKey(); case "sequencenumber": diff --git a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointConfigurer.java b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointConfigurer.java index b165e71..00e68d0 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointConfigurer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointConfigurer.java @@ -62,6 +62,8 @@ public class Kinesis2EndpointConfigurer extends PropertyConfigurerSupport implem case "region": target.getConfiguration().setRegion(property(camelContext, java.lang.String.class, value)); return true; case "repeatcount": case "repeatCount": target.setRepeatCount(property(camelContext, long.class, value)); return true; + case "resumestrategy": + case "resumeStrategy": target.getConfiguration().setResumeStrategy(property(camelContext, org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy.class, value)); return true; case "runlogginglevel": case "runLoggingLevel": target.setRunLoggingLevel(property(camelContext, org.apache.camel.LoggingLevel.class, value)); return true; case "scheduledexecutorservice": @@ -144,6 +146,8 @@ public class Kinesis2EndpointConfigurer extends PropertyConfigurerSupport implem case "region": return java.lang.String.class; case "repeatcount": case "repeatCount": return long.class; + case "resumestrategy": + case "resumeStrategy": return org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy.class; case "runlogginglevel": case "runLoggingLevel": return org.apache.camel.LoggingLevel.class; case "scheduledexecutorservice": @@ -222,6 +226,8 @@ public class Kinesis2EndpointConfigurer extends PropertyConfigurerSupport implem case "region": return target.getConfiguration().getRegion(); case "repeatcount": case "repeatCount": return target.getRepeatCount(); + case "resumestrategy": + case "resumeStrategy": return target.getConfiguration().getResumeStrategy(); case "runlogginglevel": case "runLoggingLevel": return target.getRunLoggingLevel(); case "scheduledexecutorservice": diff --git a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointUriFactory.java b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointUriFactory.java index 1ba2211..5dafce0 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointUriFactory.java +++ b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointUriFactory.java @@ -20,7 +20,7 @@ public class Kinesis2EndpointUriFactory extends org.apache.camel.support.compone private static final Set<String> PROPERTY_NAMES; private static final Set<String> SECRET_PROPERTY_NAMES; static { - Set<String> props = new HashSet<>(38); + Set<String> props = new HashSet<>(39); props.add("backoffMultiplier"); props.add("initialDelay"); props.add("scheduler"); @@ -56,6 +56,7 @@ public class Kinesis2EndpointUriFactory extends org.apache.camel.support.compone props.add("startScheduler"); props.add("accessKey"); props.add("overrideEndpoint"); + props.add("resumeStrategy"); props.add("maxResultsPerRequest"); props.add("region"); props.add("exceptionHandler"); diff --git a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json index bc7981b..51a2f4c 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json +++ b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json @@ -36,6 +36,7 @@ "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a me [...] "iteratorType": { "kind": "property", "displayName": "Iterator Type", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "software.amazon.awssdk.services.kinesis.model.ShardIteratorType", "enum": [ "AT_SEQUENCE_NUMBER", "AFTER_SEQUENCE_NUMBER", "TRIM_HORIZON", "LATEST", "AT_TIMESTAMP", "null" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "TRIM_HORIZON", "configurationClass": "org.apache.camel.component.aws2.kinesi [...] "maxResultsPerRequest": { "kind": "property", "displayName": "Max Results Per Request", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "description": "Maximum number of records that will be fetched in each poll" }, + "resumeStrategy": { "kind": "property", "displayName": "Resume Strategy", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "KinesisUserConfigurationResumeStrategy", "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "descri [...] "sequenceNumber": { "kind": "property", "displayName": "Sequence Number", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "description": "The sequence number to start polling from. Required if iteratorType is set to AFTER_SEQUENCE_NUMBER or [...] "shardClosed": { "kind": "property", "displayName": "Shard Closed", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum", "enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ignore", "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "des [...] "shardId": { "kind": "property", "displayName": "Shard Id", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "description": "Defines which shardId in the Kinesis stream to get records from" }, @@ -59,6 +60,7 @@ "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a m [...] "iteratorType": { "kind": "parameter", "displayName": "Iterator Type", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "software.amazon.awssdk.services.kinesis.model.ShardIteratorType", "enum": [ "AT_SEQUENCE_NUMBER", "AFTER_SEQUENCE_NUMBER", "TRIM_HORIZON", "LATEST", "AT_TIMESTAMP", "null" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "TRIM_HORIZON", "configurationClass": "org.apache.camel.component.aws2.kines [...] "maxResultsPerRequest": { "kind": "parameter", "displayName": "Max Results Per Request", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "description": "Maximum number of records that will be fetched in each poll" }, + "resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "KinesisUserConfigurationResumeStrategy", "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "descr [...] "sendEmptyMessageWhenIdle": { "kind": "parameter", "displayName": "Send Empty Message When Idle", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead." }, "sequenceNumber": { "kind": "parameter", "displayName": "Sequence Number", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "description": "The sequence number to start polling from. Required if iteratorType is set to AFTER_SEQUENCE_NUMBER or [...] "shardClosed": { "kind": "parameter", "displayName": "Shard Closed", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum", "enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ignore", "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "de [...] diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java index db57da4..0ded74b 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java @@ -17,6 +17,7 @@ package org.apache.camel.component.aws2.kinesis; import org.apache.camel.RuntimeCamelException; +import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; @@ -83,6 +84,11 @@ public class Kinesis2Configuration implements Cloneable { "static credentials to be passed in.") private boolean useDefaultCredentialsProvider; + @UriParam(label = "consumer", + description = "Defines a resume strategy for AWS Kinesis. The default strategy reads the sequenceNumber if provided", + defaultValue = "KinesisUserConfigurationResumeStrategy") + private KinesisResumeStrategy resumeStrategy; + public KinesisClient getAmazonKinesisClient() { return amazonKinesisClient; } @@ -227,6 +233,14 @@ public class Kinesis2Configuration implements Cloneable { this.useDefaultCredentialsProvider = useDefaultCredentialsProvider; } + public KinesisResumeStrategy getResumeStrategy() { + return resumeStrategy; + } + + public void setResumeStrategy(KinesisResumeStrategy resumeStrategy) { + this.resumeStrategy = resumeStrategy; + } + // ************************************************* // // ************************************************* diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java index 3446612..44cec89 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java @@ -23,6 +23,8 @@ import java.util.Queue; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy; +import org.apache.camel.component.aws2.kinesis.consumer.KinesisUserConfigurationResumeStrategy; import org.apache.camel.support.ScheduledBatchPollingConsumer; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; @@ -37,7 +39,6 @@ import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.Shard; -import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; public class Kinesis2Consumer extends ScheduledBatchPollingConsumer { @@ -160,9 +161,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer { .streamName(getEndpoint().getConfiguration().getStreamName()).shardId(shardId) .shardIteratorType(getEndpoint().getConfiguration().getIteratorType()); - if (hasSequenceNumber()) { - req.startingSequenceNumber(getEndpoint().getConfiguration().getSequenceNumber()); - } + resume(req); GetShardIteratorResponse result = getClient().getShardIterator(req.build()); currentShardIterator = result.shardIterator(); @@ -171,6 +170,17 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer { return currentShardIterator; } + private void resume(GetShardIteratorRequest.Builder req) { + KinesisResumeStrategy resumeStrategy; + if (getEndpoint().getConfiguration().getResumeStrategy() == null) { + resumeStrategy = new KinesisUserConfigurationResumeStrategy(getEndpoint().getConfiguration()); + } else { + resumeStrategy = getEndpoint().getConfiguration().getResumeStrategy(); + } + + resumeStrategy.resume(req); + } + private Queue<Exchange> createExchanges(List<Record> records) { Queue<Exchange> exchanges = new ArrayDeque<>(); for (Record record : records) { @@ -192,9 +202,4 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer { return exchange; } - private boolean hasSequenceNumber() { - return !getEndpoint().getConfiguration().getSequenceNumber().isEmpty() - && (getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) - || getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER)); - } } diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java new file mode 100644 index 0000000..7e0e5bc --- /dev/null +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisResumeStrategy.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.aws2.kinesis.consumer; + +import org.apache.camel.ResumeStrategy; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; + +public interface KinesisResumeStrategy extends ResumeStrategy<GetShardIteratorRequest.Builder> { + +} diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java new file mode 100644 index 0000000..4d78990 --- /dev/null +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisUserConfigurationResumeStrategy.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.aws2.kinesis.consumer; + +import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; + +public class KinesisUserConfigurationResumeStrategy implements KinesisResumeStrategy { + private final Kinesis2Configuration configuration; + + public KinesisUserConfigurationResumeStrategy(Kinesis2Configuration configuration) { + this.configuration = configuration; + } + + private boolean hasSequenceNumber() { + return !configuration.getSequenceNumber().isEmpty() + && (configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) + || configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER)); + } + + @Override + public void resume(GetShardIteratorRequest.Builder resumable) { + if (hasSequenceNumber()) { + resumable.startingSequenceNumber(configuration.getSequenceNumber()); + } + } +} diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java index c6ac5e1..061c1c0 100644 --- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java +++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java @@ -286,6 +286,24 @@ public interface Aws2KinesisComponentBuilderFactory { return this; } /** + * Defines a resume strategy for AWS Kinesis. The default strategy reads + * the sequenceNumber if provided. + * + * The option is a: + * <code>org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy</code> type. + * + * Default: KinesisUserConfigurationResumeStrategy + * Group: consumer + * + * @param resumeStrategy the value to set + * @return the dsl builder + */ + default Aws2KinesisComponentBuilder resumeStrategy( + org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy resumeStrategy) { + doSetProperty("resumeStrategy", resumeStrategy); + return this; + } + /** * The sequence number to start polling from. Required if iteratorType * is set to AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER. * @@ -448,6 +466,7 @@ public interface Aws2KinesisComponentBuilderFactory { case "bridgeErrorHandler": ((Kinesis2Component) component).setBridgeErrorHandler((boolean) value); return true; case "iteratorType": getOrCreateConfiguration((Kinesis2Component) component).setIteratorType((software.amazon.awssdk.services.kinesis.model.ShardIteratorType) value); return true; case "maxResultsPerRequest": getOrCreateConfiguration((Kinesis2Component) component).setMaxResultsPerRequest((int) value); return true; + case "resumeStrategy": getOrCreateConfiguration((Kinesis2Component) component).setResumeStrategy((org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy) value); return true; case "sequenceNumber": getOrCreateConfiguration((Kinesis2Component) component).setSequenceNumber((java.lang.String) value); return true; case "shardClosed": getOrCreateConfiguration((Kinesis2Component) component).setShardClosed((org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum) value); return true; case "shardId": getOrCreateConfiguration((Kinesis2Component) component).setShardId((java.lang.String) value); return true; diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java index 958879b8..77f6817 100644 --- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java +++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java @@ -438,6 +438,42 @@ public interface Kinesis2EndpointBuilderFactory { return this; } /** + * Defines a resume strategy for AWS Kinesis. The default strategy reads + * the sequenceNumber if provided. + * + * The option is a: + * <code>org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy</code> type. + * + * Default: KinesisUserConfigurationResumeStrategy + * Group: consumer + * + * @param resumeStrategy the value to set + * @return the dsl builder + */ + default Kinesis2EndpointConsumerBuilder resumeStrategy( + Object resumeStrategy) { + doSetProperty("resumeStrategy", resumeStrategy); + return this; + } + /** + * Defines a resume strategy for AWS Kinesis. The default strategy reads + * the sequenceNumber if provided. + * + * The option will be converted to a + * <code>org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy</code> type. + * + * Default: KinesisUserConfigurationResumeStrategy + * Group: consumer + * + * @param resumeStrategy the value to set + * @return the dsl builder + */ + default Kinesis2EndpointConsumerBuilder resumeStrategy( + String resumeStrategy) { + doSetProperty("resumeStrategy", resumeStrategy); + return this; + } + /** * If the polling consumer did not poll any files, you can enable this * option to send an empty message (no body) instead. *