This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch feature/CAMEL-23789-wave1-multi-dsl-docs in repository https://gitbox.apache.org/repos/asf/camel.git
commit d4808749ee62bf7fe0e394cdef411e5aad387c02 Author: Claus Ibsen <[email protected]> AuthorDate: Wed Jun 17 19:08:33 2026 +0200 CAMEL-23789: Make Kafka component docs multi-DSL friendly (Wave 1) Co-Authored-By: Claude <[email protected]> Signed-off-by: Claus Ibsen <[email protected]> --- .../camel-kafka/src/main/docs/kafka-component.adoc | 258 +++++++++++++++++++-- 1 file changed, 235 insertions(+), 23 deletions(-) diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index cbc11a29d825..be0c7d06a179 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -49,7 +49,7 @@ For more information about Producer/Consumer configuration: http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs] http://kafka.apache.org/documentation.html#producerconfigs[http://kafka.apache.org/documentation.html#producerconfigs] -If you want to send a message to a dynamic topic then use `KafkaConstants.OVERRIDE_TOPIC` as it is used as a one-time header that is not sent along the message, and actually is removed in the producer. +If you want to send a message to a dynamic topic then use the `CamelKafkaOverrideTopic` header as it is used as a one-time header that is not sent along the message, and actually is removed in the producer. == Usage @@ -83,6 +83,7 @@ How Camel handles a message that results in an exception can be altered using th Instead of continuing to poll the next message, Camel will instead commit the offset so that the message that caused the exception will be retried. This is similar to the *RETRY* polling strategy above. +._Java-only: programmatic `KafkaComponent` configuration_ [source,java] ---- KafkaComponent kafka = new KafkaComponent(); @@ -122,6 +123,7 @@ To use, this repository must be placed in the Camel registry, either manually or Sample usage is as follows: +._Java-only: programmatic `KafkaIdempotentRepository` setup and registry binding_ [source,java] ---- KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091"); @@ -168,6 +170,7 @@ In XML: There are 3 alternatives to choose from when using idempotency with numeric identifiers. The first one is to use the static method `numericHeader` method from `org.apache.camel.component.kafka.serde.KafkaSerdeHelper` to perform the conversion for you: +._Java-only: uses `numericHeader` Java API_ [source,java] ---- from("direct:performInsert") @@ -178,6 +181,7 @@ from("direct:performInsert") Alternatively, it is possible to use a custom serializer configured via the route URL to perform the conversion: +._Java-only: custom `DefaultKafkaHeaderDeserializer` implementation_ [source,java] ---- public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer { @@ -198,6 +202,7 @@ public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer { Lastly, it is also possible to do so in a processor: +._Java-only: uses `Processor` to convert header bytes to string_ [source,java] ---- from(from).routeId("foo") @@ -220,6 +225,7 @@ In case you want to force manual commits, you can use `KafkaManualCommit` API fr This requires turning on manual commits by either setting the option `allowManualCommit` to `true` on the `KafkaComponent` or on the endpoint, for example: +._Java-only: programmatic `KafkaComponent` configuration_ [source,java] ---- KafkaComponent kafka = new KafkaComponent(); @@ -232,11 +238,12 @@ camelContext.addComponent("kafka", kafka); By default, it uses the `NoopCommitManager` behind the scenes. To commit an offset, you will require you to use the `KafkaManualCommit` from Java code such as a Camel `Processor`: +._Java-only: uses `KafkaManualCommit` Java API to commit offsets_ [source,java] ---- public void process(Exchange exchange) { KafkaManualCommit manual = - exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + exchange.getIn().getHeader("CamelKafkaManualCommit", KafkaManualCommit.class); manual.commit(); } ---- @@ -252,6 +259,7 @@ on the `KafkaComponent` that creates instances of your custom implementation. When configuring a consumer to use manual commit and a specific `CommitManager` it is important to understand how these influence the behavior of `breakOnFirstError` +._Java-only: programmatic `KafkaComponent` configuration with manual commit factory_ [source,java] ---- KafkaComponent kafka = new KafkaComponent(); @@ -281,6 +289,7 @@ exception with the message `KafkaConsumer is not safe for multi-threaded access` The Kafka component supports pausable consumers. This type of consumer can pause consuming data based on conditions external to the component itself, such as an external system being unavailable or other transient conditions. +._Java-only: uses `pausable()` Java DSL with `KafkaConsumerListener`_ [source,java] ---- from("kafka:topic") @@ -312,6 +321,10 @@ The following header value types are supported: `String`, `Integer`, `Long`, `Do Note: all headers propagated *from* kafka *to* camel exchange will contain `byte[]` value by default. To override default functionality, these uri parameters can be set: `headerDeserializer` for `from` route and `headerSerializer` for `to` route. For example: +[tabs] +==== +Java:: ++ [source,java] ---- from("kafka:my_topic?headerDeserializer=#myDeserializer") @@ -319,10 +332,43 @@ from("kafka:my_topic?headerDeserializer=#myDeserializer") .to("kafka:my_topic?headerSerializer=#mySerializer") ---- +XML:: ++ +[source,xml] +---- +<route> + <from uri="kafka:my_topic?headerDeserializer=#myDeserializer"/> + <!-- ... --> + <to uri="kafka:my_topic?headerSerializer=#mySerializer"/> +</route> +---- + +YAML:: ++ +[source,yaml] +---- +- route: + from: + uri: kafka:my_topic + parameters: + headerDeserializer: "#myDeserializer" + steps: + # ... + - to: + uri: kafka:my_topic + parameters: + headerSerializer: "#mySerializer" +---- +==== + By default, all headers are being filtered by `KafkaHeaderFilterStrategy`. Strategy filters out headers which start with `Camel` or `org.apache.camel` prefixes. Default strategy can be overridden by using `headerFilterStrategy` uri parameter in both `to` and `from` routes: +[tabs] +==== +Java:: ++ [source,java] ---- from("kafka:my_topic?headerFilterStrategy=#myStrategy") @@ -330,6 +376,35 @@ from("kafka:my_topic?headerFilterStrategy=#myStrategy") .to("kafka:my_topic?headerFilterStrategy=#myStrategy") ---- +XML:: ++ +[source,xml] +---- +<route> + <from uri="kafka:my_topic?headerFilterStrategy=#myStrategy"/> + <!-- ... --> + <to uri="kafka:my_topic?headerFilterStrategy=#myStrategy"/> +</route> +---- + +YAML:: ++ +[source,yaml] +---- +- route: + from: + uri: kafka:my_topic + parameters: + headerFilterStrategy: "#myStrategy" + steps: + # ... + - to: + uri: kafka:my_topic + parameters: + headerFilterStrategy: "#myStrategy" +---- +==== + `myStrategy` object should be a subclass of `HeaderFilterStrategy` and must be placed in the Camel registry, either manually or by registration as a bean in Spring, as it is `CamelContext` aware. === Kafka Transaction @@ -383,6 +458,7 @@ If both `transacted=true` and `transactionalId` are present, the latter takes pr Configure the 'krb5.conf' file directly through the API: +._Java-only: static `KafkaComponent` configuration_ [source,java] ---- static { @@ -464,16 +540,45 @@ camel.component.kafka.sasl-password=mypassword *OAuth Authentication Example* +[tabs] +==== +Java:: ++ [source,java] ---- -from("kafka:my-topic?brokers=localhost:9092" + - "&saslAuthType=OAUTH" + - "&oauthClientId=my-client" + - "&oauthClientSecret=my-secret" + - "&oauthTokenEndpointUri=https://auth.example.com/oauth/token") +from("kafka:my-topic?brokers=localhost:9092&saslAuthType=OAUTH&oauthClientId=my-client&oauthClientSecret=my-secret&oauthTokenEndpointUri=https://auth.example.com/oauth/token") .to("log:received"); ---- +XML:: ++ +[source,xml] +---- +<route> + <from uri="kafka:my-topic?brokers=localhost:9092&saslAuthType=OAUTH&oauthClientId=my-client&oauthClientSecret=my-secret&oauthTokenEndpointUri=https://auth.example.com/oauth/token"/> + <to uri="log:received"/> +</route> +---- + +YAML:: ++ +[source,yaml] +---- +- route: + from: + uri: kafka:my-topic + parameters: + brokers: "localhost:9092" + saslAuthType: OAUTH + oauthClientId: my-client + oauthClientSecret: my-secret + oauthTokenEndpointUri: "https://auth.example.com/oauth/token" + steps: + - to: + uri: log:received +---- +==== + *AWS MSK IAM Authentication* When using AWS MSK with IAM authentication, ensure the `aws-msk-iam-auth` library is on the classpath: @@ -600,6 +705,8 @@ By default, Camel uses automatic commits when using batch processing. In this ca In case of failures, the records will not be processed. The code below provides an example of this approach: + +._Java-only: batch processing with inline `Processor` to iterate over exchanges_ [source,java] ---- public void configure() { @@ -630,6 +737,7 @@ It is recommended to implement appropriate error handling mechanisms and pattern The code below provides an example of handling errors with automatic commits: +._Java-only: `onException` and batch processing with inline `Processor`_ [source,java] ---- public void configure() { @@ -775,7 +883,7 @@ onException(Exception.class) .process(exchange -> { // Commit manually when error occurs KafkaManualCommit manual = exchange.getMessage() - .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + .getHeader("CamelKafkaManualCommit", KafkaManualCommit.class); manual.commit(); }); @@ -790,7 +898,7 @@ from("kafka:topic?groupId=myGroup&batching=true&breakOnFirstError=true&autoCommi .process(exchange -> { // Manual commit on successful processing KafkaManualCommit manual = exchange.getMessage() - .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + .getHeader("CamelKafkaManualCommit", KafkaManualCommit.class); manual.commit(); }) .to("mock:result"); @@ -870,7 +978,7 @@ public class ErrorCommitProcessor implements Processor { LOG.warn("Error occurred, performing manual commit before reconnection"); KafkaManualCommit manual = exchange.getMessage() - .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + .getHeader("CamelKafkaManualCommit", KafkaManualCommit.class); if (manual != null) { manual.commit(); @@ -888,7 +996,7 @@ public class SuccessCommitProcessor implements Processor { LOG.debug("Batch processed successfully, performing manual commit"); KafkaManualCommit manual = exchange.getMessage() - .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + .getHeader("CamelKafkaManualCommit", KafkaManualCommit.class); if (manual != null) { manual.commit(); @@ -905,6 +1013,7 @@ When working with batch processing with manual commits, it's up to the applicati The code below provides an example of this approach: +._Java-only: batch processing with `KafkaManualCommit` to commit the whole batch_ [source,java] ---- public void configure() { @@ -925,7 +1034,7 @@ public void configure() { final Object tmp = exchanges.getLast(); if (tmp instanceof Exchange exchange) { KafkaManualCommit manual = - exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + exchange.getMessage().getHeader("CamelKafkaManualCommit", KafkaManualCommit.class); LOG.debug("Performing manual commit"); manual.commit(); LOG.debug("Done performing manual commit"); @@ -942,6 +1051,7 @@ To properly do so, first make sure to have a max polling interval that is higher Then, increase the shutdown timeout to ensure that committing, closing and other Kafka operations are not abruptly aborted. For instance: +._Java-only: programmatic shutdown strategy configuration_ [source,java] ---- public void configure() { @@ -957,6 +1067,7 @@ public void configure() { Applications with complex subscription logic may provide a custom bean to handle the subscription process. To so, it is necessary to implement the interface `SubscribeAdapter`. +._Java-only: custom `SubscribeAdapter` implementation_ [source,java] .Example subscriber adapter that subscribes to a set of Kafka topics or patterns ---- @@ -974,10 +1085,11 @@ public class CustomSubscribeAdapter implements SubscribeAdapter { Then, it is necessary to add it as named bean instance to the registry: +._Java-only: programmatic registry binding_ [source,java] .Add to registry example ---- -context.getRegistry().bind(KafkaConstants.KAFKA_SUBSCRIBE_ADAPTER, new CustomSubscribeAdapter()); +context.getRegistry().bind("subscribeAdapter", new CustomSubscribeAdapter()); ---- === Interoperability @@ -1002,17 +1114,46 @@ To utilize this solution, you need to modify the route URI on the consumer end o `headerDeserializer` option. For example: +[tabs] +==== +Java:: ++ [source,java] -.Route snippet ---- from("kafka:topic?headerDeserializer=#class:org.apache.camel.component.kafka.consumer.support.interop.JMSDeserializer") .to("..."); ---- +XML:: ++ +[source,xml] +---- +<route> + <from uri="kafka:topic?headerDeserializer=#class:org.apache.camel.component.kafka.consumer.support.interop.JMSDeserializer"/> + <to uri="..."/> +</route> +---- + +YAML:: ++ +[source,yaml] +---- +- route: + from: + uri: kafka:topic + parameters: + headerDeserializer: "#class:org.apache.camel.component.kafka.consumer.support.interop.JMSDeserializer" + steps: + - to: + uri: "..." +---- +==== + === Producer Performance If the producer is performing too slowly for your needs, you may want to aggregate the exchanges before sending. +._Java-only: uses `GroupedExchangeAggregationStrategy` which requires Java_ [source,java] .Route snippet ---- @@ -1183,6 +1324,7 @@ To keep the offsets, the component needs a `StateRepository` implementation such This bean should be available in the registry. Here how to use it : +._Java-only: programmatic `FileStateRepository` setup and registry binding_ [source,java] ---- // Create the repository in which the Kafka offsets will be persisted @@ -1215,33 +1357,103 @@ camelContext.addRoutes(new RouteBuilder() { Here is the minimal route you need to produce messages to Kafka. +[tabs] +==== +Java:: ++ [source,java] ---- from("direct:start") - .setBody(constant("Message from Camel")) // Message to send - .setHeader(KafkaConstants.KEY, constant("Camel")) // Key of the message + .setBody(constant("Message from Camel")) + .setHeader("CamelKafkaKey", constant("Camel")) .to("kafka:test?brokers=localhost:9092"); ---- +XML:: ++ +[source,xml] +---- +<route> + <from uri="direct:start"/> + <setBody> + <constant>Message from Camel</constant> + </setBody> + <setHeader name="CamelKafkaKey"> + <constant>Camel</constant> + </setHeader> + <to uri="kafka:test?brokers=localhost:9092"/> +</route> +---- + +YAML:: ++ +[source,yaml] +---- +- route: + from: + uri: direct:start + steps: + - setBody: + constant: "Message from Camel" + - setHeader: + name: CamelKafkaKey + constant: "Camel" + - to: + uri: kafka:test + parameters: + brokers: "localhost:9092" +---- +==== + === SSL configuration You have two different ways to configure the SSL communication on the Kafka component. The first way is through the many SSL endpoint parameters: +[tabs] +==== +Java:: ++ [source,java] ---- -from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" + - "&groupId=A" + - "&sslKeystoreLocation=/path/to/keystore.jks" + - "&sslKeystorePassword=changeit" + - "&sslKeyPassword=changeit" + - "&securityProtocol=SSL") - .to("mock:result"); +from("kafka:myTopic?brokers=localhost:9092&groupId=A&sslKeystoreLocation=/path/to/keystore.jks&sslKeystorePassword=changeit&sslKeyPassword=changeit&securityProtocol=SSL") + .to("mock:result"); ---- +XML:: ++ +[source,xml] +---- +<route> + <from uri="kafka:myTopic?brokers=localhost:9092&groupId=A&sslKeystoreLocation=/path/to/keystore.jks&sslKeystorePassword=changeit&sslKeyPassword=changeit&securityProtocol=SSL"/> + <to uri="mock:result"/> +</route> +---- + +YAML:: ++ +[source,yaml] +---- +- route: + from: + uri: kafka:myTopic + parameters: + brokers: "localhost:9092" + groupId: A + sslKeystoreLocation: /path/to/keystore.jks + sslKeystorePassword: changeit + sslKeyPassword: changeit + securityProtocol: SSL + steps: + - to: + uri: mock:result +---- +==== + The second way is to use the `sslContextParameters` endpoint parameter: +._Java-only: programmatic `SSLContextParameters` setup and registry binding_ [source,java] ---- // Configure the SSLContextParameters object
