This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 554750528de670885cd36914097cfabe78c6de60 Author: Otavio Rodolfo Piske <[email protected]> AuthorDate: Tue Feb 8 17:20:53 2022 +0100 CAMEL-15562: removed the resume strategy option from components - removed the resume strategy option from the file component - removed the resume strategy option from Kafka component - removed the resume strategy option from the CouchDB component - removed the resume strategy option from the AWS 2 Kinesis component --- .../camel/catalog/components/aws2-kinesis.json | 2 - .../apache/camel/catalog/components/couchdb.json | 1 - .../org/apache/camel/catalog/components/file.json | 1 - .../org/apache/camel/catalog/components/kafka.json | 2 - .../aws2/kinesis/Kinesis2ComponentConfigurer.java | 6 - .../aws2/kinesis/Kinesis2EndpointConfigurer.java | 6 - .../aws2/kinesis/Kinesis2EndpointUriFactory.java | 3 +- .../camel/component/aws2/kinesis/aws2-kinesis.json | 2 - .../aws2/kinesis/Kinesis2Configuration.java | 14 --- .../component/aws2/kinesis/Kinesis2Consumer.java | 7 +- .../couchdb/CouchDbEndpointConfigurer.java | 6 - .../couchdb/CouchDbEndpointUriFactory.java | 3 +- .../apache/camel/component/couchdb/couchdb.json | 1 - .../component/couchdb/CouchDbChangesetTracker.java | 4 +- .../camel/component/couchdb/CouchDbConsumer.java | 14 ++- .../camel/component/couchdb/CouchDbEndpoint.java | 15 --- ...tory.java => CouchDbResumeStrategyFactory.java} | 10 +- .../component/file/FileEndpointConfigurer.java | 6 - .../component/file/FileEndpointUriFactory.java | 3 +- .../org/apache/camel/component/file/file.json | 1 - .../apache/camel/component/file/FileConsumer.java | 18 +-- .../apache/camel/component/file/FileEndpoint.java | 20 ---- .../component/kafka/KafkaComponentConfigurer.java | 6 - .../component/kafka/KafkaEndpointConfigurer.java | 6 - .../component/kafka/KafkaEndpointUriFactory.java | 3 +- .../org/apache/camel/component/kafka/kafka.json | 2 - .../camel/component/kafka/KafkaConfiguration.java | 24 ---- .../consumer/support/ResumeStrategyFactory.java | 5 - .../KafkaConsumerWithResumeStrategyIT.java | 123 --------------------- .../FileConsumerResumeFromOffsetStrategyTest.java | 18 ++- .../file/FileConsumerResumeStrategyTest.java | 26 +++-- .../dsl/Aws2KinesisComponentBuilderFactory.java | 19 ---- .../dsl/KafkaComponentBuilderFactory.java | 25 ----- .../dsl/CouchDbEndpointBuilderFactory.java | 36 ------ .../endpoint/dsl/FileEndpointBuilderFactory.java | 38 ------- .../endpoint/dsl/KafkaEndpointBuilderFactory.java | 48 -------- .../dsl/Kinesis2EndpointBuilderFactory.java | 36 ------ 37 files changed, 70 insertions(+), 490 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json index a498481..6902ece 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/aws2-kinesis.json @@ -36,7 +36,6 @@ "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a me [...] "iteratorType": { "kind": "property", "displayName": "Iterator Type", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "software.amazon.awssdk.services.kinesis.model.ShardIteratorType", "enum": [ "AT_SEQUENCE_NUMBER", "AFTER_SEQUENCE_NUMBER", "TRIM_HORIZON", "LATEST", "AT_TIMESTAMP", "null" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "TRIM_HORIZON", "configurationClass": "org.apache.camel.component.aws2.kinesi [...] "maxResultsPerRequest": { "kind": "property", "displayName": "Max Results Per Request", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "description": "Maximum number of records that will be fetched in each poll" }, - "resumeStrategy": { "kind": "property", "displayName": "Resume Strategy", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "KinesisUserConfigurationResumeStrategy", "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "descri [...] "sequenceNumber": { "kind": "property", "displayName": "Sequence Number", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "description": "The sequence number to start polling from. Required if iteratorType is set to AFTER_SEQUENCE_NUMBER or [...] "shardClosed": { "kind": "property", "displayName": "Shard Closed", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum", "enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ignore", "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "des [...] "shardId": { "kind": "property", "displayName": "Shard Id", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "description": "Defines which shardId in the Kinesis stream to get records from" }, @@ -60,7 +59,6 @@ "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a m [...] "iteratorType": { "kind": "parameter", "displayName": "Iterator Type", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "software.amazon.awssdk.services.kinesis.model.ShardIteratorType", "enum": [ "AT_SEQUENCE_NUMBER", "AFTER_SEQUENCE_NUMBER", "TRIM_HORIZON", "LATEST", "AT_TIMESTAMP", "null" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "TRIM_HORIZON", "configurationClass": "org.apache.camel.component.aws2.kines [...] "maxResultsPerRequest": { "kind": "parameter", "displayName": "Max Results Per Request", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "description": "Maximum number of records that will be fetched in each poll" }, - "resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "KinesisUserConfigurationResumeStrategy", "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "descr [...] "sendEmptyMessageWhenIdle": { "kind": "parameter", "displayName": "Send Empty Message When Idle", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead." }, "sequenceNumber": { "kind": "parameter", "displayName": "Sequence Number", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "description": "The sequence number to start polling from. Required if iteratorType is set to AFTER_SEQUENCE_NUMBER or [...] "shardClosed": { "kind": "parameter", "displayName": "Shard Closed", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum", "enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ignore", "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "de [...] diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchdb.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchdb.json index 69fdb92..9f4a1e5 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchdb.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchdb.json @@ -35,7 +35,6 @@ "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a m [...] "deletes": { "kind": "parameter", "displayName": "Deletes", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Document deletes are published as events" }, "heartbeat": { "kind": "parameter", "displayName": "Heartbeat", "group": "consumer", "label": "consumer", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "30000", "description": "How often to send an empty message to keep socket alive in millis" }, - "resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a custom resume strategy for tracking changes from CouchDB. It allows tracking from a specific point (i.e.: since the given update sequence, the latest sequence, etc)." }, "style": { "kind": "parameter", "displayName": "Style", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "all_docs", "main_only" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "main_only", "description": "Specifies how many revisions are returned in the changes array. The default, main_only, will only return the current winning revision; all_docs will return all leaf revisions (inclu [...] "updates": { "kind": "parameter", "displayName": "Updates", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Document inserts\/updates are published as events" }, "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the con [...] diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/file.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/file.json index 721f7a4..f6e6025 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/file.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/file.json @@ -49,7 +49,6 @@ "pollStrategy": { "kind": "parameter", "displayName": "Poll Strategy", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.PollingConsumerPollStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation [...] "probeContentType": { "kind": "parameter", "displayName": "Probe Content Type", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to enable probing of the content type. If enable then the consumer uses Files#probeContentType(java.nio.file.Path) to determine the content-type of the file, and store that as a he [...] "processStrategy": { "kind": "parameter", "displayName": "Process Strategy", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.file.GenericFileProcessStrategy<java.io.File>", "deprecated": false, "autowired": false, "secret": false, "description": "A pluggable org.apache.camel.component.file.GenericFileProcessStrategy allowing you to implement your own readLock option or similar. Can also be used [...] - "resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.file.consumer.FileConsumerResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Set a resume strategy for files. This makes it possible to define a strategy for resuming reading files after the last point before stopping the application. [...] "startingDirectoryMustExist": { "kind": "parameter", "displayName": "Starting Directory Must Exist", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the starting directory must exist. Mind that the autoCreate option is default enabled, which means the starting directory is normally auto created if it doesn' [...] "startingDirectoryMustHaveAccess": { "kind": "parameter", "displayName": "Starting Directory Must Have Access", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the starting directory has access permissions. Mind that the startingDirectoryMustExist parameter must be set to true in order to verify that the di [...] "appendChars": { "kind": "parameter", "displayName": "Append Chars", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Used to append characters (text) after writing files. This can for example be used to add new lines or other separators when writing and appending new files or existing files. To specify new-line (slash-n or slash-r) or tab (slash-t) [...] diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json index 4758ba8..5007791 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json @@ -55,7 +55,6 @@ "partitionAssignor": { "kind": "property", "displayName": "Partition Assignor", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The class name of the partition assignme [...] "pollOnError": { "kind": "property", "displayName": "Poll On Error", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ "DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "de [...] "pollTimeoutMs": { "kind": "property", "displayName": "Poll Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used when polling the KafkaConsumer." }, - "resumeStrategy": { "kind": "property", "displayName": "Resume Strategy", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This option allows the user to set a custom resume s [...] "seekTo": { "kind": "property", "displayName": "Seek To", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning [...] "sessionTimeoutMs": { "kind": "property", "displayName": "Session Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used to detect failures when using Kafka's group management facilities." }, "specificAvroReader": { "kind": "property", "displayName": "Specific Avro Reader", "group": "consumer", "label": "confluent,consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This enables the use of a specific Avro reader for use with the Confluent Platf [...] @@ -161,7 +160,6 @@ "partitionAssignor": { "kind": "parameter", "displayName": "Partition Assignor", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The class name of the partition assignm [...] "pollOnError": { "kind": "parameter", "displayName": "Poll On Error", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ "DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "d [...] "pollTimeoutMs": { "kind": "parameter", "displayName": "Poll Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used when polling the KafkaConsumer." }, - "resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This option allows the user to set a custom resume [...] "seekTo": { "kind": "parameter", "displayName": "Seek To", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning [...] "sessionTimeoutMs": { "kind": "parameter", "displayName": "Session Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used to detect failures when using Kafka's group management facilities." }, "specificAvroReader": { "kind": "parameter", "displayName": "Specific Avro Reader", "group": "consumer", "label": "confluent,consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This enables the use of a specific Avro reader for use with the Confluent Plat [...] diff --git a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2ComponentConfigurer.java b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2ComponentConfigurer.java index 97bb77a..1a15fdc 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2ComponentConfigurer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2ComponentConfigurer.java @@ -54,8 +54,6 @@ public class Kinesis2ComponentConfigurer extends PropertyConfigurerSupport imple case "proxyprotocol": case "proxyProtocol": getOrCreateConfiguration(target).setProxyProtocol(property(camelContext, software.amazon.awssdk.core.Protocol.class, value)); return true; case "region": getOrCreateConfiguration(target).setRegion(property(camelContext, java.lang.String.class, value)); return true; - case "resumestrategy": - case "resumeStrategy": getOrCreateConfiguration(target).setResumeStrategy(property(camelContext, org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy.class, value)); return true; case "secretkey": case "secretKey": getOrCreateConfiguration(target).setSecretKey(property(camelContext, java.lang.String.class, value)); return true; case "sequencenumber": @@ -108,8 +106,6 @@ public class Kinesis2ComponentConfigurer extends PropertyConfigurerSupport imple case "proxyprotocol": case "proxyProtocol": return software.amazon.awssdk.core.Protocol.class; case "region": return java.lang.String.class; - case "resumestrategy": - case "resumeStrategy": return org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy.class; case "secretkey": case "secretKey": return java.lang.String.class; case "sequencenumber": @@ -158,8 +154,6 @@ public class Kinesis2ComponentConfigurer extends PropertyConfigurerSupport imple case "proxyprotocol": case "proxyProtocol": return getOrCreateConfiguration(target).getProxyProtocol(); case "region": return getOrCreateConfiguration(target).getRegion(); - case "resumestrategy": - case "resumeStrategy": return getOrCreateConfiguration(target).getResumeStrategy(); case "secretkey": case "secretKey": return getOrCreateConfiguration(target).getSecretKey(); case "sequencenumber": diff --git a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointConfigurer.java b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointConfigurer.java index 00e68d0..b165e71 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointConfigurer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointConfigurer.java @@ -62,8 +62,6 @@ public class Kinesis2EndpointConfigurer extends PropertyConfigurerSupport implem case "region": target.getConfiguration().setRegion(property(camelContext, java.lang.String.class, value)); return true; case "repeatcount": case "repeatCount": target.setRepeatCount(property(camelContext, long.class, value)); return true; - case "resumestrategy": - case "resumeStrategy": target.getConfiguration().setResumeStrategy(property(camelContext, org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy.class, value)); return true; case "runlogginglevel": case "runLoggingLevel": target.setRunLoggingLevel(property(camelContext, org.apache.camel.LoggingLevel.class, value)); return true; case "scheduledexecutorservice": @@ -146,8 +144,6 @@ public class Kinesis2EndpointConfigurer extends PropertyConfigurerSupport implem case "region": return java.lang.String.class; case "repeatcount": case "repeatCount": return long.class; - case "resumestrategy": - case "resumeStrategy": return org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy.class; case "runlogginglevel": case "runLoggingLevel": return org.apache.camel.LoggingLevel.class; case "scheduledexecutorservice": @@ -226,8 +222,6 @@ public class Kinesis2EndpointConfigurer extends PropertyConfigurerSupport implem case "region": return target.getConfiguration().getRegion(); case "repeatcount": case "repeatCount": return target.getRepeatCount(); - case "resumestrategy": - case "resumeStrategy": return target.getConfiguration().getResumeStrategy(); case "runlogginglevel": case "runLoggingLevel": return target.getRunLoggingLevel(); case "scheduledexecutorservice": diff --git a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointUriFactory.java b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointUriFactory.java index c04c83b..5627b87 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointUriFactory.java +++ b/components/camel-aws/camel-aws2-kinesis/src/generated/java/org/apache/camel/component/aws2/kinesis/Kinesis2EndpointUriFactory.java @@ -21,7 +21,7 @@ public class Kinesis2EndpointUriFactory extends org.apache.camel.support.compone private static final Set<String> SECRET_PROPERTY_NAMES; private static final Set<String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(39); + Set<String> props = new HashSet<>(38); props.add("backoffMultiplier"); props.add("initialDelay"); props.add("scheduler"); @@ -57,7 +57,6 @@ public class Kinesis2EndpointUriFactory extends org.apache.camel.support.compone props.add("startScheduler"); props.add("accessKey"); props.add("overrideEndpoint"); - props.add("resumeStrategy"); props.add("maxResultsPerRequest"); props.add("region"); props.add("exceptionHandler"); diff --git a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json index a498481..6902ece 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/generated/resources/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json +++ b/components/camel-aws/camel-aws2-kinesis/src/generated/resources/org/apache/camel/component/aws2/kinesis/aws2-kinesis.json @@ -36,7 +36,6 @@ "bridgeErrorHandler": { "kind": "property", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a me [...] "iteratorType": { "kind": "property", "displayName": "Iterator Type", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "software.amazon.awssdk.services.kinesis.model.ShardIteratorType", "enum": [ "AT_SEQUENCE_NUMBER", "AFTER_SEQUENCE_NUMBER", "TRIM_HORIZON", "LATEST", "AT_TIMESTAMP", "null" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "TRIM_HORIZON", "configurationClass": "org.apache.camel.component.aws2.kinesi [...] "maxResultsPerRequest": { "kind": "property", "displayName": "Max Results Per Request", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "description": "Maximum number of records that will be fetched in each poll" }, - "resumeStrategy": { "kind": "property", "displayName": "Resume Strategy", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "KinesisUserConfigurationResumeStrategy", "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "descri [...] "sequenceNumber": { "kind": "property", "displayName": "Sequence Number", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "description": "The sequence number to start polling from. Required if iteratorType is set to AFTER_SEQUENCE_NUMBER or [...] "shardClosed": { "kind": "property", "displayName": "Shard Closed", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum", "enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ignore", "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "des [...] "shardId": { "kind": "property", "displayName": "Shard Id", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "description": "Defines which shardId in the Kinesis stream to get records from" }, @@ -60,7 +59,6 @@ "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a m [...] "iteratorType": { "kind": "parameter", "displayName": "Iterator Type", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "software.amazon.awssdk.services.kinesis.model.ShardIteratorType", "enum": [ "AT_SEQUENCE_NUMBER", "AFTER_SEQUENCE_NUMBER", "TRIM_HORIZON", "LATEST", "AT_TIMESTAMP", "null" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "TRIM_HORIZON", "configurationClass": "org.apache.camel.component.aws2.kines [...] "maxResultsPerRequest": { "kind": "parameter", "displayName": "Max Results Per Request", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "description": "Maximum number of records that will be fetched in each poll" }, - "resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "KinesisUserConfigurationResumeStrategy", "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "descr [...] "sendEmptyMessageWhenIdle": { "kind": "parameter", "displayName": "Send Empty Message When Idle", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead." }, "sequenceNumber": { "kind": "parameter", "displayName": "Sequence Number", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "description": "The sequence number to start polling from. Required if iteratorType is set to AFTER_SEQUENCE_NUMBER or [...] "shardClosed": { "kind": "parameter", "displayName": "Shard Closed", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum", "enum": [ "ignore", "fail", "silent" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ignore", "configurationClass": "org.apache.camel.component.aws2.kinesis.Kinesis2Configuration", "configurationField": "configuration", "de [...] diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java index 0ded74b..db57da4 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Configuration.java @@ -17,7 +17,6 @@ package org.apache.camel.component.aws2.kinesis; import org.apache.camel.RuntimeCamelException; -import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParams; @@ -84,11 +83,6 @@ public class Kinesis2Configuration implements Cloneable { "static credentials to be passed in.") private boolean useDefaultCredentialsProvider; - @UriParam(label = "consumer", - description = "Defines a resume strategy for AWS Kinesis. The default strategy reads the sequenceNumber if provided", - defaultValue = "KinesisUserConfigurationResumeStrategy") - private KinesisResumeStrategy resumeStrategy; - public KinesisClient getAmazonKinesisClient() { return amazonKinesisClient; } @@ -233,14 +227,6 @@ public class Kinesis2Configuration implements Cloneable { this.useDefaultCredentialsProvider = useDefaultCredentialsProvider; } - public KinesisResumeStrategy getResumeStrategy() { - return resumeStrategy; - } - - public void setResumeStrategy(KinesisResumeStrategy resumeStrategy) { - this.resumeStrategy = resumeStrategy; - } - // ************************************************* // // ************************************************* diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java index 8ad6181..f8bee4c 100644 --- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java +++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java @@ -171,11 +171,10 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer { } private void resume(GetShardIteratorRequest.Builder req) { - KinesisResumeStrategy resumeStrategy; - if (getEndpoint().getConfiguration().getResumeStrategy() == null) { + KinesisResumeStrategy resumeStrategy = getEndpoint().getCamelContext().hasService(KinesisResumeStrategy.class); + + if (resumeStrategy == null) { resumeStrategy = new KinesisUserConfigurationResumeStrategy(getEndpoint().getConfiguration()); - } else { - resumeStrategy = getEndpoint().getConfiguration().getResumeStrategy(); } resumeStrategy.setRequestBuilder(req); diff --git a/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointConfigurer.java b/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointConfigurer.java index 5452c62..6629f7f 100644 --- a/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointConfigurer.java +++ b/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointConfigurer.java @@ -34,8 +34,6 @@ public class CouchDbEndpointConfigurer extends PropertyConfigurerSupport impleme case "lazystartproducer": case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true; case "password": target.setPassword(property(camelContext, java.lang.String.class, value)); return true; - case "resumestrategy": - case "resumeStrategy": target.setResumeStrategy(property(camelContext, org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategy.class, value)); return true; case "style": target.setStyle(property(camelContext, java.lang.String.class, value)); return true; case "updates": target.setUpdates(property(camelContext, boolean.class, value)); return true; case "username": target.setUsername(property(camelContext, java.lang.String.class, value)); return true; @@ -59,8 +57,6 @@ public class CouchDbEndpointConfigurer extends PropertyConfigurerSupport impleme case "lazystartproducer": case "lazyStartProducer": return boolean.class; case "password": return java.lang.String.class; - case "resumestrategy": - case "resumeStrategy": return org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategy.class; case "style": return java.lang.String.class; case "updates": return boolean.class; case "username": return java.lang.String.class; @@ -85,8 +81,6 @@ public class CouchDbEndpointConfigurer extends PropertyConfigurerSupport impleme case "lazystartproducer": case "lazyStartProducer": return target.isLazyStartProducer(); case "password": return target.getPassword(); - case "resumestrategy": - case "resumeStrategy": return target.getResumeStrategy(); case "style": return target.getStyle(); case "updates": return target.isUpdates(); case "username": return target.getUsername(); diff --git a/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointUriFactory.java b/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointUriFactory.java index 92d9d31..92f01c8 100644 --- a/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointUriFactory.java +++ b/components/camel-couchdb/src/generated/java/org/apache/camel/component/couchdb/CouchDbEndpointUriFactory.java @@ -21,7 +21,7 @@ public class CouchDbEndpointUriFactory extends org.apache.camel.support.componen private static final Set<String> SECRET_PROPERTY_NAMES; private static final Set<String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(16); + Set<String> props = new HashSet<>(15); props.add("deletes"); props.add("heartbeat"); props.add("exchangePattern"); @@ -34,7 +34,6 @@ public class CouchDbEndpointUriFactory extends org.apache.camel.support.componen props.add("password"); props.add("bridgeErrorHandler"); props.add("port"); - props.add("resumeStrategy"); props.add("style"); props.add("exceptionHandler"); props.add("username"); diff --git a/components/camel-couchdb/src/generated/resources/org/apache/camel/component/couchdb/couchdb.json b/components/camel-couchdb/src/generated/resources/org/apache/camel/component/couchdb/couchdb.json index 69fdb92..9f4a1e5 100644 --- a/components/camel-couchdb/src/generated/resources/org/apache/camel/component/couchdb/couchdb.json +++ b/components/camel-couchdb/src/generated/resources/org/apache/camel/component/couchdb/couchdb.json @@ -35,7 +35,6 @@ "bridgeErrorHandler": { "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a m [...] "deletes": { "kind": "parameter", "displayName": "Deletes", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Document deletes are published as events" }, "heartbeat": { "kind": "parameter", "displayName": "Heartbeat", "group": "consumer", "label": "consumer", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "30000", "description": "How often to send an empty message to keep socket alive in millis" }, - "resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Sets a custom resume strategy for tracking changes from CouchDB. It allows tracking from a specific point (i.e.: since the given update sequence, the latest sequence, etc)." }, "style": { "kind": "parameter", "displayName": "Style", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "all_docs", "main_only" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "main_only", "description": "Specifies how many revisions are returned in the changes array. The default, main_only, will only return the current winning revision; all_docs will return all leaf revisions (inclu [...] "updates": { "kind": "parameter", "displayName": "Updates", "group": "consumer", "label": "consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "description": "Document inserts\/updates are published as events" }, "exceptionHandler": { "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the con [...] diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java index 9e305ad..58a9a70 100644 --- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java +++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java @@ -22,7 +22,7 @@ import com.google.gson.JsonObject; import org.apache.camel.Exchange; import org.apache.camel.component.couchdb.consumer.CouchDbResumable; import org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategy; -import org.apache.camel.component.couchdb.consumer.CouchDdResumeStrategyFactory; +import org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategyFactory; import org.apache.camel.support.task.BlockingTask; import org.apache.camel.support.task.Tasks; import org.apache.camel.support.task.budget.Budgets; @@ -53,7 +53,7 @@ public class CouchDbChangesetTracker implements Runnable { CouchDbResumable resumable = new CouchDbResumable(couchClient, sequence); if (sequence == null) { - CouchDbResumeStrategy resumeStrategy = CouchDdResumeStrategyFactory.newResumeStrategy(this.endpoint); + CouchDbResumeStrategy resumeStrategy = CouchDbResumeStrategyFactory.newResumeStrategy(this.consumer); resumeStrategy.setResumable(resumable); resumeStrategy.resume(); diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java index 3a8e478..1a22bf3 100644 --- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java +++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java @@ -21,14 +21,17 @@ import java.util.concurrent.ExecutorService; import com.google.gson.JsonObject; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.ResumeAware; +import org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategy; import org.apache.camel.support.DefaultConsumer; -public class CouchDbConsumer extends DefaultConsumer { +public class CouchDbConsumer extends DefaultConsumer implements ResumeAware<CouchDbResumeStrategy> { private final CouchDbClientWrapper couchClient; private final CouchDbEndpoint endpoint; private ExecutorService executor; private CouchDbChangesetTracker task; + private CouchDbResumeStrategy resumeStrategy; public CouchDbConsumer(CouchDbEndpoint endpoint, CouchDbClientWrapper couchClient, Processor processor) { super(endpoint, processor); @@ -36,6 +39,15 @@ public class CouchDbConsumer extends DefaultConsumer { this.endpoint = endpoint; } + @Override + public void setResumeStrategy(CouchDbResumeStrategy resumeStrategy) { + this.resumeStrategy = resumeStrategy; + } + + public CouchDbResumeStrategy getResumeStrategy() { + return resumeStrategy; + } + public Exchange createExchange(String seq, String id, JsonObject obj, boolean deleted) { Exchange exchange = createExchange(false); exchange.getIn().setHeader(CouchDbConstants.HEADER_DATABASE, endpoint.getDatabase()); diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java index b2db700..a45d0c3 100644 --- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java +++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java @@ -22,7 +22,6 @@ import org.apache.camel.Category; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategy; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; @@ -70,8 +69,6 @@ public class CouchDbEndpoint extends DefaultEndpoint { private boolean deletes = true; @UriParam(label = "consumer", defaultValue = "true") private boolean updates = true; - @UriParam(label = "consumer") - private CouchDbResumeStrategy resumeStrategy; public CouchDbEndpoint() { } @@ -238,16 +235,4 @@ public class CouchDbEndpoint extends DefaultEndpoint { public void setUpdates(boolean updates) { this.updates = updates; } - - public CouchDbResumeStrategy getResumeStrategy() { - return resumeStrategy; - } - - /** - * Sets a custom resume strategy for tracking changes from CouchDB. It allows tracking from a specific point (i.e.: - * since the given update sequence, the latest sequence, etc). - */ - public void setResumeStrategy(CouchDbResumeStrategy resumeStrategy) { - this.resumeStrategy = resumeStrategy; - } } diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDdResumeStrategyFactory.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategyFactory.java similarity index 82% rename from components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDdResumeStrategyFactory.java rename to components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategyFactory.java index 15d4543..ce0196f 100644 --- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDdResumeStrategyFactory.java +++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/consumer/CouchDbResumeStrategyFactory.java @@ -17,14 +17,14 @@ package org.apache.camel.component.couchdb.consumer; -import org.apache.camel.component.couchdb.CouchDbEndpoint; +import org.apache.camel.component.couchdb.CouchDbConsumer; -public final class CouchDdResumeStrategyFactory { - private CouchDdResumeStrategyFactory() { +public final class CouchDbResumeStrategyFactory { + private CouchDbResumeStrategyFactory() { } - public static CouchDbResumeStrategy newResumeStrategy(CouchDbEndpoint endpoint) { - CouchDbResumeStrategy resumeStrategy = endpoint.getResumeStrategy(); + public static CouchDbResumeStrategy newResumeStrategy(CouchDbConsumer consumer) { + CouchDbResumeStrategy resumeStrategy = consumer.getResumeStrategy(); if (resumeStrategy == null) { resumeStrategy = new LatestUpdateSequenceResumeStrategy(); diff --git a/components/camel-file/src/generated/java/org/apache/camel/component/file/FileEndpointConfigurer.java b/components/camel-file/src/generated/java/org/apache/camel/component/file/FileEndpointConfigurer.java index f291e0b..1e914ba 100644 --- a/components/camel-file/src/generated/java/org/apache/camel/component/file/FileEndpointConfigurer.java +++ b/components/camel-file/src/generated/java/org/apache/camel/component/file/FileEndpointConfigurer.java @@ -162,8 +162,6 @@ public class FileEndpointConfigurer extends PropertyConfigurerSupport implements case "renameUsingCopy": target.setRenameUsingCopy(property(camelContext, boolean.class, value)); return true; case "repeatcount": case "repeatCount": target.setRepeatCount(property(camelContext, long.class, value)); return true; - case "resumestrategy": - case "resumeStrategy": target.setResumeStrategy(property(camelContext, org.apache.camel.component.file.consumer.FileConsumerResumeStrategy.class, value)); return true; case "runlogginglevel": case "runLoggingLevel": target.setRunLoggingLevel(property(camelContext, org.apache.camel.LoggingLevel.class, value)); return true; case "scheduledexecutorservice": @@ -340,8 +338,6 @@ public class FileEndpointConfigurer extends PropertyConfigurerSupport implements case "renameUsingCopy": return boolean.class; case "repeatcount": case "repeatCount": return long.class; - case "resumestrategy": - case "resumeStrategy": return org.apache.camel.component.file.consumer.FileConsumerResumeStrategy.class; case "runlogginglevel": case "runLoggingLevel": return org.apache.camel.LoggingLevel.class; case "scheduledexecutorservice": @@ -519,8 +515,6 @@ public class FileEndpointConfigurer extends PropertyConfigurerSupport implements case "renameUsingCopy": return target.isRenameUsingCopy(); case "repeatcount": case "repeatCount": return target.getRepeatCount(); - case "resumestrategy": - case "resumeStrategy": return target.getResumeStrategy(); case "runlogginglevel": case "runLoggingLevel": return target.getRunLoggingLevel(); case "scheduledexecutorservice": diff --git a/components/camel-file/src/generated/java/org/apache/camel/component/file/FileEndpointUriFactory.java b/components/camel-file/src/generated/java/org/apache/camel/component/file/FileEndpointUriFactory.java index d08e6c1d..1910468 100644 --- a/components/camel-file/src/generated/java/org/apache/camel/component/file/FileEndpointUriFactory.java +++ b/components/camel-file/src/generated/java/org/apache/camel/component/file/FileEndpointUriFactory.java @@ -21,7 +21,7 @@ public class FileEndpointUriFactory extends org.apache.camel.support.component.E private static final Set<String> SECRET_PROPERTY_NAMES; private static final Set<String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(95); + Set<String> props = new HashSet<>(94); props.add("renameUsingCopy"); props.add("moveExistingFileStrategy"); props.add("fileName"); @@ -58,7 +58,6 @@ public class FileEndpointUriFactory extends org.apache.camel.support.component.E props.add("lazyStartProducer"); props.add("delay"); props.add("startScheduler"); - props.add("resumeStrategy"); props.add("readLockMarkerFile"); props.add("readLockTimeout"); props.add("exceptionHandler"); diff --git a/components/camel-file/src/generated/resources/org/apache/camel/component/file/file.json b/components/camel-file/src/generated/resources/org/apache/camel/component/file/file.json index 721f7a4..f6e6025 100644 --- a/components/camel-file/src/generated/resources/org/apache/camel/component/file/file.json +++ b/components/camel-file/src/generated/resources/org/apache/camel/component/file/file.json @@ -49,7 +49,6 @@ "pollStrategy": { "kind": "parameter", "displayName": "Poll Strategy", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.PollingConsumerPollStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation [...] "probeContentType": { "kind": "parameter", "displayName": "Probe Content Type", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether to enable probing of the content type. If enable then the consumer uses Files#probeContentType(java.nio.file.Path) to determine the content-type of the file, and store that as a he [...] "processStrategy": { "kind": "parameter", "displayName": "Process Strategy", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.file.GenericFileProcessStrategy<java.io.File>", "deprecated": false, "autowired": false, "secret": false, "description": "A pluggable org.apache.camel.component.file.GenericFileProcessStrategy allowing you to implement your own readLock option or similar. Can also be used [...] - "resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.component.file.consumer.FileConsumerResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "Set a resume strategy for files. This makes it possible to define a strategy for resuming reading files after the last point before stopping the application. [...] "startingDirectoryMustExist": { "kind": "parameter", "displayName": "Starting Directory Must Exist", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the starting directory must exist. Mind that the autoCreate option is default enabled, which means the starting directory is normally auto created if it doesn' [...] "startingDirectoryMustHaveAccess": { "kind": "parameter", "displayName": "Starting Directory Must Have Access", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the starting directory has access permissions. Mind that the startingDirectoryMustExist parameter must be set to true in order to verify that the di [...] "appendChars": { "kind": "parameter", "displayName": "Append Chars", "group": "producer", "label": "producer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Used to append characters (text) after writing files. This can for example be used to add new lines or other separators when writing and appending new files or existing files. To specify new-line (slash-n or slash-r) or tab (slash-t) [...] diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java index e9cca6b..1b3ae3b 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java @@ -31,6 +31,7 @@ import java.util.Set; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; +import org.apache.camel.ResumeAware; import org.apache.camel.component.file.consumer.FileConsumerResumeStrategy; import org.apache.camel.component.file.consumer.FileResumeSet; import org.apache.camel.component.file.consumer.FileSetResumeStrategy; @@ -43,10 +44,10 @@ import org.slf4j.LoggerFactory; /** * File consumer. */ -public class FileConsumer extends GenericFileConsumer<File> { +public class FileConsumer extends GenericFileConsumer<File> implements ResumeAware<FileConsumerResumeStrategy> { private static final Logger LOG = LoggerFactory.getLogger(FileConsumer.class); - private final FileConsumerResumeStrategy resumeStrategy; + private FileConsumerResumeStrategy resumeStrategy; private String endpointPath; private Set<String> extendedAttributes; @@ -59,8 +60,6 @@ public class FileConsumer extends GenericFileConsumer<File> { List<String> attributes = Arrays.asList(endpoint.getExtendedAttributes().split(",")); this.extendedAttributes = new HashSet<>(attributes); } - - resumeStrategy = endpoint.getResumeStrategy(); } @Override @@ -104,8 +103,8 @@ public class FileConsumer extends GenericFileConsumer<File> { GenericFile<File> gf = asGenericFile(endpointPath, file, getEndpoint().getCharset(), getEndpoint().isProbeContentType()); - if (resumeStrategy != null && resumeStrategy instanceof GenericFileResumeStrategy) { - resumeStrategy.resume(gf); + if (resumeStrategy instanceof GenericFileResumeStrategy) { + ((GenericFileResumeStrategy<File>) resumeStrategy).resume(gf); } if (file.isDirectory()) { @@ -172,7 +171,7 @@ public class FileConsumer extends GenericFileConsumer<File> { } } - if (resumeStrategy != null && resumeStrategy instanceof FileSetResumeStrategy) { + if (resumeStrategy instanceof FileSetResumeStrategy) { FileResumeSet resumeSet = new FileResumeSet(dirFiles); resumeStrategy.resume(resumeSet); @@ -306,4 +305,9 @@ public class FileConsumer extends GenericFileConsumer<File> { // underlying file is not return !file.getFile().getAbsolutePath().equals(file.getAbsoluteFilePath()); } + + @Override + public void setResumeStrategy(FileConsumerResumeStrategy resumeStrategy) { + this.resumeStrategy = resumeStrategy; + } } diff --git a/components/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java b/components/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java index c57f0f3..dc51742 100644 --- a/components/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java +++ b/components/camel-file/src/main/java/org/apache/camel/component/file/FileEndpoint.java @@ -31,7 +31,6 @@ import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.PollingConsumer; import org.apache.camel.Processor; -import org.apache.camel.component.file.consumer.FileConsumerResumeStrategy; import org.apache.camel.component.file.strategy.FileMoveExistingStrategy; import org.apache.camel.component.file.strategy.FileProcessStrategyFactory; import org.apache.camel.spi.Metadata; @@ -83,9 +82,6 @@ public class FileEndpoint extends GenericFileEndpoint<File> { @UriParam(label = "producer,advanced") private String chmodDirectory; - @UriParam(label = "consumer,advanced") - private FileConsumerResumeStrategy resumeStrategy; - public FileEndpoint() { } @@ -364,22 +360,6 @@ public class FileEndpoint extends GenericFileEndpoint<File> { this.extendedAttributes = extendedAttributes; } - public FileConsumerResumeStrategy getResumeStrategy() { - return resumeStrategy; - } - - /** - * Set a resume strategy for files. This makes it possible to define a strategy for resuming reading files after the - * last point before stopping the application. - * - * See the {@link FileConsumerResumeStrategy} for implementation details - * - * @param resumeStrategy an instance of the resume strategy to be used - */ - public void setResumeStrategy(FileConsumerResumeStrategy resumeStrategy) { - this.resumeStrategy = resumeStrategy; - } - /** * Chmod value must be between 000 and 777; If there is a leading digit like in 0755 we will ignore it. */ diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java index 0d5a1c2..e2fa43e 100644 --- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java +++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaComponentConfigurer.java @@ -160,8 +160,6 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen case "requestRequiredAcks": getOrCreateConfiguration(target).setRequestRequiredAcks(property(camelContext, java.lang.String.class, value)); return true; case "requesttimeoutms": case "requestTimeoutMs": getOrCreateConfiguration(target).setRequestTimeoutMs(property(camelContext, java.lang.Integer.class, value)); return true; - case "resumestrategy": - case "resumeStrategy": getOrCreateConfiguration(target).setResumeStrategy(property(camelContext, org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy.class, value)); return true; case "retries": getOrCreateConfiguration(target).setRetries(property(camelContext, java.lang.Integer.class, value)); return true; case "retrybackoffms": case "retryBackoffMs": getOrCreateConfiguration(target).setRetryBackoffMs(property(camelContext, java.lang.Integer.class, value)); return true; @@ -374,8 +372,6 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen case "requestRequiredAcks": return java.lang.String.class; case "requesttimeoutms": case "requestTimeoutMs": return java.lang.Integer.class; - case "resumestrategy": - case "resumeStrategy": return org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy.class; case "retries": return java.lang.Integer.class; case "retrybackoffms": case "retryBackoffMs": return java.lang.Integer.class; @@ -584,8 +580,6 @@ public class KafkaComponentConfigurer extends PropertyConfigurerSupport implemen case "requestRequiredAcks": return getOrCreateConfiguration(target).getRequestRequiredAcks(); case "requesttimeoutms": case "requestTimeoutMs": return getOrCreateConfiguration(target).getRequestTimeoutMs(); - case "resumestrategy": - case "resumeStrategy": return getOrCreateConfiguration(target).getResumeStrategy(); case "retries": return getOrCreateConfiguration(target).getRetries(); case "retrybackoffms": case "retryBackoffMs": return getOrCreateConfiguration(target).getRetryBackoffMs(); diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java index 270e758..2ee0b05 100644 --- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java +++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointConfigurer.java @@ -152,8 +152,6 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement case "requestRequiredAcks": target.getConfiguration().setRequestRequiredAcks(property(camelContext, java.lang.String.class, value)); return true; case "requesttimeoutms": case "requestTimeoutMs": target.getConfiguration().setRequestTimeoutMs(property(camelContext, java.lang.Integer.class, value)); return true; - case "resumestrategy": - case "resumeStrategy": target.getConfiguration().setResumeStrategy(property(camelContext, org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy.class, value)); return true; case "retries": target.getConfiguration().setRetries(property(camelContext, java.lang.Integer.class, value)); return true; case "retrybackoffms": case "retryBackoffMs": target.getConfiguration().setRetryBackoffMs(property(camelContext, java.lang.Integer.class, value)); return true; @@ -358,8 +356,6 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement case "requestRequiredAcks": return java.lang.String.class; case "requesttimeoutms": case "requestTimeoutMs": return java.lang.Integer.class; - case "resumestrategy": - case "resumeStrategy": return org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy.class; case "retries": return java.lang.Integer.class; case "retrybackoffms": case "retryBackoffMs": return java.lang.Integer.class; @@ -565,8 +561,6 @@ public class KafkaEndpointConfigurer extends PropertyConfigurerSupport implement case "requestRequiredAcks": return target.getConfiguration().getRequestRequiredAcks(); case "requesttimeoutms": case "requestTimeoutMs": return target.getConfiguration().getRequestTimeoutMs(); - case "resumestrategy": - case "resumeStrategy": return target.getConfiguration().getResumeStrategy(); case "retries": return target.getConfiguration().getRetries(); case "retrybackoffms": case "retryBackoffMs": return target.getConfiguration().getRetryBackoffMs(); diff --git a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java index 7ace7fd..250221a 100644 --- a/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java +++ b/components/camel-kafka/src/generated/java/org/apache/camel/component/kafka/KafkaEndpointUriFactory.java @@ -21,7 +21,7 @@ public class KafkaEndpointUriFactory extends org.apache.camel.support.component. private static final Set<String> SECRET_PROPERTY_NAMES; private static final Set<String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(103); + Set<String> props = new HashSet<>(102); props.add("synchronous"); props.add("queueBufferingMaxMessages"); props.add("allowManualCommit"); @@ -88,7 +88,6 @@ public class KafkaEndpointUriFactory extends org.apache.camel.support.component. props.add("sslKeystorePassword"); props.add("kafkaManualCommitFactory"); props.add("sslEndpointAlgorithm"); - props.add("resumeStrategy"); props.add("topic"); props.add("sslProtocol"); props.add("sslKeymanagerAlgorithm"); diff --git a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json index 4758ba8..5007791 100644 --- a/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json +++ b/components/camel-kafka/src/generated/resources/org/apache/camel/component/kafka/kafka.json @@ -55,7 +55,6 @@ "partitionAssignor": { "kind": "property", "displayName": "Partition Assignor", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The class name of the partition assignme [...] "pollOnError": { "kind": "property", "displayName": "Poll On Error", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ "DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "de [...] "pollTimeoutMs": { "kind": "property", "displayName": "Poll Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used when polling the KafkaConsumer." }, - "resumeStrategy": { "kind": "property", "displayName": "Resume Strategy", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This option allows the user to set a custom resume s [...] "seekTo": { "kind": "property", "displayName": "Seek To", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning [...] "sessionTimeoutMs": { "kind": "property", "displayName": "Session Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used to detect failures when using Kafka's group management facilities." }, "specificAvroReader": { "kind": "property", "displayName": "Specific Avro Reader", "group": "consumer", "label": "confluent,consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This enables the use of a specific Avro reader for use with the Confluent Platf [...] @@ -161,7 +160,6 @@ "partitionAssignor": { "kind": "parameter", "displayName": "Partition Assignor", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "org.apache.kafka.clients.consumer.RangeAssignor", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The class name of the partition assignm [...] "pollOnError": { "kind": "parameter", "displayName": "Poll On Error", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.PollOnError", "enum": [ "DISCARD", "ERROR_HANDLER", "RECONNECT", "RETRY", "STOP" ], "deprecated": false, "autowired": false, "secret": false, "defaultValue": "ERROR_HANDLER", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "d [...] "pollTimeoutMs": { "kind": "parameter", "displayName": "Poll Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "duration", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "5000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used when polling the KafkaConsumer." }, - "resumeStrategy": { "kind": "parameter", "displayName": "Resume Strategy", "group": "consumer", "label": "consumer", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This option allows the user to set a custom resume [...] "seekTo": { "kind": "parameter", "displayName": "Seek To", "group": "consumer", "label": "consumer", "required": false, "type": "string", "javaType": "java.lang.String", "enum": [ "beginning", "end" ], "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Set if KafkaConsumer will read from beginning or end on startup: beginning : read from beginning [...] "sessionTimeoutMs": { "kind": "parameter", "displayName": "Session Timeout Ms", "group": "consumer", "label": "consumer", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10000", "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The timeout used to detect failures when using Kafka's group management facilities." }, "specificAvroReader": { "kind": "parameter", "displayName": "Specific Avro Reader", "group": "consumer", "label": "confluent,consumer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "This enables the use of a specific Avro reader for use with the Confluent Plat [...] diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 74a1cb1..7c75e44 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -27,7 +27,6 @@ import java.util.stream.Collectors; import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.kafka.consumer.KafkaManualCommit; -import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy; import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer; import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderSerializer; import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer; @@ -149,9 +148,6 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware @UriParam(label = "consumer", defaultValue = "5000", javaType = "java.time.Duration") private Long commitTimeoutMs = 5000L; - @UriParam(label = "consumer") - private KafkaConsumerResumeStrategy resumeStrategy; - // Producer configuration properties @UriParam(label = "producer", defaultValue = KafkaConstants.KAFKA_DEFAULT_PARTITIONER) private String partitioner = KafkaConstants.KAFKA_DEFAULT_PARTITIONER; @@ -802,26 +798,6 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware this.breakOnFirstError = breakOnFirstError; } - public KafkaConsumerResumeStrategy getResumeStrategy() { - return resumeStrategy; - } - - /** - * This option allows the user to set a custom resume strategy. The resume strategy is executed when partitions are - * assigned (i.e.: when connecting or reconnecting). It allows implementations to customize how to resume operations - * and serve as more flexible alternative to the seekTo and the offsetRepository mechanisms. - * - * See the {@link KafkaConsumerResumeStrategy} for implementation details. - * - * This option does not affect the auto commit setting. It is likely that implementations using this setting will - * also want to evaluate using the manual commit option along with this. - * - * @param resumeStrategy An instance of the resume strategy - */ - public void setResumeStrategy(KafkaConsumerResumeStrategy resumeStrategy) { - this.resumeStrategy = resumeStrategy; - } - public String getBrokers() { return brokers; } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java index c2648ce..2db61f1 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ResumeStrategyFactory.java @@ -65,11 +65,6 @@ public final class ResumeStrategyFactory { KafkaConfiguration configuration = kafkaConsumer.getEndpoint().getConfiguration(); - if (configuration.getResumeStrategy() != null) { - LOG.info("Using user-provided strategy"); - return configuration.getResumeStrategy(); - } - return builtinResumeStrategies(configuration); } diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java deleted file mode 100644 index fb8bae5..0000000 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeStrategyIT.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.kafka.integration; - -import java.util.Collections; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.apache.camel.BindToRegistry; -import org.apache.camel.EndpointInject; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy; -import org.apache.camel.component.kafka.consumer.support.KafkaResumable; -import org.apache.camel.component.mock.MockEndpoint; -import org.apache.kafka.clients.consumer.Consumer; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; - -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class KafkaConsumerWithResumeStrategyIT extends BaseEmbeddedKafkaTestSupport { - private static final String TOPIC = "custom-resume"; - - @EndpointInject("mock:result") - private MockEndpoint result; - - @BindToRegistry("resumeStrategy") - private TestKafkaConsumerResumeStrategy resumeStrategy; - private CountDownLatch messagesLatch; - - private static class TestKafkaConsumerResumeStrategy implements KafkaConsumerResumeStrategy { - private final CountDownLatch messagesLatch; - private boolean resumeCalled; - private boolean consumerIsNull = true; - - public TestKafkaConsumerResumeStrategy(CountDownLatch messagesLatch) { - this.messagesLatch = messagesLatch; - } - - @Override - public void setConsumer(Consumer<?, ?> consumer) { - if (consumer != null) { - consumerIsNull = false; - } - } - - @Override - public void resume(KafkaResumable resumable) { - resumeCalled = true; - messagesLatch.countDown(); - } - - @Override - public void resume() { - - } - - public boolean isResumeCalled() { - return resumeCalled; - } - - public boolean isConsumerIsNull() { - return consumerIsNull; - } - - @Override - public void start() throws Exception { - - } - } - - @Override - protected void doPreSetup() { - messagesLatch = new CountDownLatch(1); - resumeStrategy = new TestKafkaConsumerResumeStrategy(messagesLatch); - } - - @Test - @Timeout(value = 30) - public void offsetGetStateMustHaveBeenCalledTwice() throws InterruptedException { - assertTrue(messagesLatch.await(4, TimeUnit.SECONDS), "The resume was not called"); - - assertTrue(resumeStrategy.isResumeCalled(), - "The resume strategy should have been called when the partition was assigned"); - assertFalse(resumeStrategy.isConsumerIsNull(), - "The consumer passed to the strategy should not be null"); - } - - @AfterEach - public void after() { - kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)); - } - - @Override - protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { - @Override - public void configure() throws Exception { - from("kafka:" + TOPIC + "?groupId=" + TOPIC + "_GROUP&autoCommitIntervalMs=1000" - + "&autoOffsetReset=latest" + "&consumersCount=1" - + "&resumeStrategy=#resumeStrategy") - .routeId("resume-strategy-route") - .to("mock:result"); - } - }; - } -} diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java index f12941d..2ef283c 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java @@ -17,6 +17,8 @@ package org.apache.camel.component.file; import java.io.File; +import java.util.HashMap; +import java.util.Map; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; @@ -25,6 +27,7 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.file.consumer.GenericFileResumable; import org.apache.camel.component.file.consumer.GenericFileResumeStrategy; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.resume.Resumables; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -42,6 +45,7 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport @Override public void resume() { + throw new UnsupportedOperationException("Unsupported operation"); // NO-OP } @@ -57,7 +61,11 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceivedInAnyOrder("34567890"); - template.sendBodyAndHeader(fileUri("resumeOff"), "01234567890", Exchange.FILE_NAME, "resume-from-offset.txt"); + Map<String, Object> headers = new HashMap<>(); + headers.put(Exchange.FILE_NAME, "resume-from-offset.txt"); + headers.put("CamelOffset", Resumables.of("resume-from-offset.txt", 3L)); + + template.sendBodyAndHeaders(fileUri("resumeOff"), "01234567890", headers); // only expect 4 of the 6 sent assertMockEndpointsSatisfied(); @@ -79,11 +87,13 @@ public class FileConsumerResumeFromOffsetStrategyTest extends ContextTestSupport protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override - public void configure() throws Exception { + public void configure() { - bindToRegistry("testResumeStrategy", new TestResumeStrategy()); + bindToRegistry("resumeStrategy", new TestResumeStrategy()); - from(fileUri("resumeOff?noop=true&recursive=true&resumeStrategy=#testResumeStrategy")) + from(fileUri("resumeOff?noop=true&recursive=true")) + .resumable().header("CamelOffset").resumableStrategyRef("resumeStrategy") + .log("${body}") .convertBodyTo(String.class).to("mock:result"); from(fileUri("resumeNone?noop=true&recursive=true")) diff --git a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java index d70134e..e867cf2 100644 --- a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeStrategyTest.java @@ -18,6 +18,7 @@ package org.apache.camel.component.file; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Objects; import org.apache.camel.ContextTestSupport; @@ -26,6 +27,7 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.file.consumer.FileResumeSet; import org.apache.camel.component.file.consumer.FileSetResumeStrategy; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.resume.Resumables; import org.junit.jupiter.api.Test; public class FileConsumerResumeStrategyTest extends ContextTestSupport { @@ -54,18 +56,25 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport { } } + private static Map<String, Object> headerFor(int num) { + String name = num + ".txt"; + + return Map.of(Exchange.FILE_NAME, name, "id", Resumables.of(name, num)); + } + @Test public void testResume() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceivedInAnyOrder("3", "4", "5", "6"); - template.sendBodyAndHeader(fileUri("resume"), "0", Exchange.FILE_NAME, "0.txt"); - template.sendBodyAndHeader(fileUri("resume"), "1", Exchange.FILE_NAME, "1.txt"); - template.sendBodyAndHeader(fileUri("resume"), "2", Exchange.FILE_NAME, "2.txt"); - template.sendBodyAndHeader(fileUri("resume"), "3", Exchange.FILE_NAME, "3.txt"); - template.sendBodyAndHeader(fileUri("resume"), "4", Exchange.FILE_NAME, "4.txt"); - template.sendBodyAndHeader(fileUri("resume"), "5", Exchange.FILE_NAME, "5.txt"); - template.sendBodyAndHeader(fileUri("resume"), "6", Exchange.FILE_NAME, "6.txt"); + + template.sendBodyAndHeaders(fileUri("resume"), "0", headerFor(0)); + template.sendBodyAndHeaders(fileUri("resume"), "1", headerFor(1)); + template.sendBodyAndHeaders(fileUri("resume"), "2", headerFor(2)); + template.sendBodyAndHeaders(fileUri("resume"), "3", headerFor(3)); + template.sendBodyAndHeaders(fileUri("resume"), "4", headerFor(4)); + template.sendBodyAndHeaders(fileUri("resume"), "5", headerFor(5)); + template.sendBodyAndHeaders(fileUri("resume"), "6", headerFor(6)); // only expect 4 of the 6 sent assertMockEndpointsSatisfied(); @@ -79,7 +88,8 @@ public class FileConsumerResumeStrategyTest extends ContextTestSupport { bindToRegistry("testResumeStrategy", new TestResumeStrategy()); - from(fileUri("resume?noop=true&recursive=true&resumeStrategy=#testResumeStrategy")) + from(fileUri("resume?noop=true&recursive=true")) + .resumable().header("id").resumableStrategyRef("testResumeStrategy") .convertBodyTo(String.class) .to("mock:result"); } diff --git a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java index 061c1c0..c6ac5e1 100644 --- a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java +++ b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/Aws2KinesisComponentBuilderFactory.java @@ -286,24 +286,6 @@ public interface Aws2KinesisComponentBuilderFactory { return this; } /** - * Defines a resume strategy for AWS Kinesis. The default strategy reads - * the sequenceNumber if provided. - * - * The option is a: - * <code>org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy</code> type. - * - * Default: KinesisUserConfigurationResumeStrategy - * Group: consumer - * - * @param resumeStrategy the value to set - * @return the dsl builder - */ - default Aws2KinesisComponentBuilder resumeStrategy( - org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy resumeStrategy) { - doSetProperty("resumeStrategy", resumeStrategy); - return this; - } - /** * The sequence number to start polling from. Required if iteratorType * is set to AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER. * @@ -466,7 +448,6 @@ public interface Aws2KinesisComponentBuilderFactory { case "bridgeErrorHandler": ((Kinesis2Component) component).setBridgeErrorHandler((boolean) value); return true; case "iteratorType": getOrCreateConfiguration((Kinesis2Component) component).setIteratorType((software.amazon.awssdk.services.kinesis.model.ShardIteratorType) value); return true; case "maxResultsPerRequest": getOrCreateConfiguration((Kinesis2Component) component).setMaxResultsPerRequest((int) value); return true; - case "resumeStrategy": getOrCreateConfiguration((Kinesis2Component) component).setResumeStrategy((org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy) value); return true; case "sequenceNumber": getOrCreateConfiguration((Kinesis2Component) component).setSequenceNumber((java.lang.String) value); return true; case "shardClosed": getOrCreateConfiguration((Kinesis2Component) component).setShardClosed((org.apache.camel.component.aws2.kinesis.Kinesis2ShardClosedStrategyEnum) value); return true; case "shardId": getOrCreateConfiguration((Kinesis2Component) component).setShardId((java.lang.String) value); return true; diff --git a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java index 7e81195..64703ab 100644 --- a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java +++ b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java @@ -673,30 +673,6 @@ public interface KafkaComponentBuilderFactory { return this; } /** - * This option allows the user to set a custom resume strategy. The - * resume strategy is executed when partitions are assigned (i.e.: when - * connecting or reconnecting). It allows implementations to customize - * how to resume operations and serve as more flexible alternative to - * the seekTo and the offsetRepository mechanisms. See the - * KafkaConsumerResumeStrategy for implementation details. This option - * does not affect the auto commit setting. It is likely that - * implementations using this setting will also want to evaluate using - * the manual commit option along with this. - * - * The option is a: - * <code>org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy</code> type. - * - * Group: consumer - * - * @param resumeStrategy the value to set - * @return the dsl builder - */ - default KafkaComponentBuilder resumeStrategy( - org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy resumeStrategy) { - doSetProperty("resumeStrategy", resumeStrategy); - return this; - } - /** * Set if KafkaConsumer will read from beginning or end on startup: * beginning : read from beginning end : read from end This is replacing * the earlier property seekToBeginning. @@ -2040,7 +2016,6 @@ public interface KafkaComponentBuilderFactory { case "partitionAssignor": getOrCreateConfiguration((KafkaComponent) component).setPartitionAssignor((java.lang.String) value); return true; case "pollOnError": getOrCreateConfiguration((KafkaComponent) component).setPollOnError((org.apache.camel.component.kafka.PollOnError) value); return true; case "pollTimeoutMs": getOrCreateConfiguration((KafkaComponent) component).setPollTimeoutMs((java.lang.Long) value); return true; - case "resumeStrategy": getOrCreateConfiguration((KafkaComponent) component).setResumeStrategy((org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy) value); return true; case "seekTo": getOrCreateConfiguration((KafkaComponent) component).setSeekTo((java.lang.String) value); return true; case "sessionTimeoutMs": getOrCreateConfiguration((KafkaComponent) component).setSessionTimeoutMs((java.lang.Integer) value); return true; case "specificAvroReader": getOrCreateConfiguration((KafkaComponent) component).setSpecificAvroReader((boolean) value); return true; diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CouchDbEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CouchDbEndpointBuilderFactory.java index 5016928..493d4d9 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CouchDbEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CouchDbEndpointBuilderFactory.java @@ -184,42 +184,6 @@ public interface CouchDbEndpointBuilderFactory { return this; } /** - * Sets a custom resume strategy for tracking changes from CouchDB. It - * allows tracking from a specific point (i.e.: since the given update - * sequence, the latest sequence, etc). - * - * The option is a: - * <code>org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategy</code> type. - * - * Group: consumer - * - * @param resumeStrategy the value to set - * @return the dsl builder - */ - default CouchDbEndpointConsumerBuilder resumeStrategy( - org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategy resumeStrategy) { - doSetProperty("resumeStrategy", resumeStrategy); - return this; - } - /** - * Sets a custom resume strategy for tracking changes from CouchDB. It - * allows tracking from a specific point (i.e.: since the given update - * sequence, the latest sequence, etc). - * - * The option will be converted to a - * <code>org.apache.camel.component.couchdb.consumer.CouchDbResumeStrategy</code> type. - * - * Group: consumer - * - * @param resumeStrategy the value to set - * @return the dsl builder - */ - default CouchDbEndpointConsumerBuilder resumeStrategy( - String resumeStrategy) { - doSetProperty("resumeStrategy", resumeStrategy); - return this; - } - /** * Specifies how many revisions are returned in the changes array. The * default, main_only, will only return the current winning revision; * all_docs will return all leaf revisions (including conflicts and diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/FileEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/FileEndpointBuilderFactory.java index 85ee35a..ed3c98f 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/FileEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/FileEndpointBuilderFactory.java @@ -2393,44 +2393,6 @@ public interface FileEndpointBuilderFactory { return this; } /** - * Set a resume strategy for files. This makes it possible to define a - * strategy for resuming reading files after the last point before - * stopping the application. See the FileConsumerResumeStrategy for - * implementation details. - * - * The option is a: - * <code>org.apache.camel.component.file.consumer.FileConsumerResumeStrategy</code> type. - * - * Group: consumer (advanced) - * - * @param resumeStrategy the value to set - * @return the dsl builder - */ - default AdvancedFileEndpointConsumerBuilder resumeStrategy( - org.apache.camel.component.file.consumer.FileConsumerResumeStrategy resumeStrategy) { - doSetProperty("resumeStrategy", resumeStrategy); - return this; - } - /** - * Set a resume strategy for files. This makes it possible to define a - * strategy for resuming reading files after the last point before - * stopping the application. See the FileConsumerResumeStrategy for - * implementation details. - * - * The option will be converted to a - * <code>org.apache.camel.component.file.consumer.FileConsumerResumeStrategy</code> type. - * - * Group: consumer (advanced) - * - * @param resumeStrategy the value to set - * @return the dsl builder - */ - default AdvancedFileEndpointConsumerBuilder resumeStrategy( - String resumeStrategy) { - doSetProperty("resumeStrategy", resumeStrategy); - return this; - } - /** * Whether the starting directory must exist. Mind that the autoCreate * option is default enabled, which means the starting directory is * normally auto created if it doesn't exist. You can disable autoCreate diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java index 41a9e2d..5d3cab9 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java @@ -1137,54 +1137,6 @@ public interface KafkaEndpointBuilderFactory { return this; } /** - * This option allows the user to set a custom resume strategy. The - * resume strategy is executed when partitions are assigned (i.e.: when - * connecting or reconnecting). It allows implementations to customize - * how to resume operations and serve as more flexible alternative to - * the seekTo and the offsetRepository mechanisms. See the - * KafkaConsumerResumeStrategy for implementation details. This option - * does not affect the auto commit setting. It is likely that - * implementations using this setting will also want to evaluate using - * the manual commit option along with this. - * - * The option is a: - * <code>org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy</code> type. - * - * Group: consumer - * - * @param resumeStrategy the value to set - * @return the dsl builder - */ - default KafkaEndpointConsumerBuilder resumeStrategy( - org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy resumeStrategy) { - doSetProperty("resumeStrategy", resumeStrategy); - return this; - } - /** - * This option allows the user to set a custom resume strategy. The - * resume strategy is executed when partitions are assigned (i.e.: when - * connecting or reconnecting). It allows implementations to customize - * how to resume operations and serve as more flexible alternative to - * the seekTo and the offsetRepository mechanisms. See the - * KafkaConsumerResumeStrategy for implementation details. This option - * does not affect the auto commit setting. It is likely that - * implementations using this setting will also want to evaluate using - * the manual commit option along with this. - * - * The option will be converted to a - * <code>org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy</code> type. - * - * Group: consumer - * - * @param resumeStrategy the value to set - * @return the dsl builder - */ - default KafkaEndpointConsumerBuilder resumeStrategy( - String resumeStrategy) { - doSetProperty("resumeStrategy", resumeStrategy); - return this; - } - /** * Set if KafkaConsumer will read from beginning or end on startup: * beginning : read from beginning end : read from end This is replacing * the earlier property seekToBeginning. diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java index 17b70b5..d47755f 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/Kinesis2EndpointBuilderFactory.java @@ -436,42 +436,6 @@ public interface Kinesis2EndpointBuilderFactory { return this; } /** - * Defines a resume strategy for AWS Kinesis. The default strategy reads - * the sequenceNumber if provided. - * - * The option is a: - * <code>org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy</code> type. - * - * Default: KinesisUserConfigurationResumeStrategy - * Group: consumer - * - * @param resumeStrategy the value to set - * @return the dsl builder - */ - default Kinesis2EndpointConsumerBuilder resumeStrategy( - org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy resumeStrategy) { - doSetProperty("resumeStrategy", resumeStrategy); - return this; - } - /** - * Defines a resume strategy for AWS Kinesis. The default strategy reads - * the sequenceNumber if provided. - * - * The option will be converted to a - * <code>org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeStrategy</code> type. - * - * Default: KinesisUserConfigurationResumeStrategy - * Group: consumer - * - * @param resumeStrategy the value to set - * @return the dsl builder - */ - default Kinesis2EndpointConsumerBuilder resumeStrategy( - String resumeStrategy) { - doSetProperty("resumeStrategy", resumeStrategy); - return this; - } - /** * If the polling consumer did not poll any files, you can enable this * option to send an empty message (no body) instead. *
