This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch feature/CAMEL-23789-wave1-multi-dsl-docs
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d4808749ee62bf7fe0e394cdef411e5aad387c02
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Jun 17 19:08:33 2026 +0200

    CAMEL-23789: Make Kafka component docs multi-DSL friendly (Wave 1)
    
    Co-Authored-By: Claude <[email protected]>
    Signed-off-by: Claus Ibsen <[email protected]>
---
 .../camel-kafka/src/main/docs/kafka-component.adoc | 258 +++++++++++++++++++--
 1 file changed, 235 insertions(+), 23 deletions(-)

diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc 
b/components/camel-kafka/src/main/docs/kafka-component.adoc
index cbc11a29d825..be0c7d06a179 100644
--- a/components/camel-kafka/src/main/docs/kafka-component.adoc
+++ b/components/camel-kafka/src/main/docs/kafka-component.adoc
@@ -49,7 +49,7 @@ For more information about Producer/Consumer configuration:
 
http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs]
 
http://kafka.apache.org/documentation.html#producerconfigs[http://kafka.apache.org/documentation.html#producerconfigs]
 
-If you want to send a message to a dynamic topic then use 
`KafkaConstants.OVERRIDE_TOPIC` as it is used as a one-time header that is not 
sent along the message, and actually is removed in the producer.
+If you want to send a message to a dynamic topic then use the 
`CamelKafkaOverrideTopic` header as it is used as a one-time header that is not 
sent along the message, and actually is removed in the producer.
 
 == Usage
 
@@ -83,6 +83,7 @@ How Camel handles a message that results in an exception can 
be altered using th
 Instead of continuing to poll the next message, Camel will instead commit the 
offset so that the message that caused the exception will be retried.
 This is similar to the *RETRY* polling strategy above.
 
+._Java-only: programmatic `KafkaComponent` configuration_
 [source,java]
 ----
 KafkaComponent kafka = new KafkaComponent();
@@ -122,6 +123,7 @@ To use, this repository must be placed in the Camel 
registry, either manually or
 
 Sample usage is as follows:
 
+._Java-only: programmatic `KafkaIdempotentRepository` setup and registry 
binding_
 [source,java]
 ----
 KafkaIdempotentRepository kafkaIdempotentRepository = new 
KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091");
@@ -168,6 +170,7 @@ In XML:
 
 There are 3 alternatives to choose from when using idempotency with numeric 
identifiers. The first one is to use the static method `numericHeader` method 
from `org.apache.camel.component.kafka.serde.KafkaSerdeHelper` to perform the 
conversion for you:
 
+._Java-only: uses `numericHeader` Java API_
 [source,java]
 ----
 from("direct:performInsert")
@@ -178,6 +181,7 @@ from("direct:performInsert")
 
 Alternatively, it is possible to use a custom serializer configured via the 
route URL to perform the conversion:
 
+._Java-only: custom `DefaultKafkaHeaderDeserializer` implementation_
 [source,java]
 ----
 public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
@@ -198,6 +202,7 @@ public class CustomHeaderDeserializer extends 
DefaultKafkaHeaderDeserializer {
 
 Lastly, it is also possible to do so in a processor:
 
+._Java-only: uses `Processor` to convert header bytes to string_
 [source,java]
 ----
 from(from).routeId("foo")
@@ -220,6 +225,7 @@ In case you want to force manual commits, you can use 
`KafkaManualCommit` API fr
 This requires turning on manual commits by either setting the option 
`allowManualCommit` to `true` on the `KafkaComponent`
 or on the endpoint, for example:
 
+._Java-only: programmatic `KafkaComponent` configuration_
 [source,java]
 ----
 KafkaComponent kafka = new KafkaComponent();
@@ -232,11 +238,12 @@ camelContext.addComponent("kafka", kafka);
 By default, it uses the `NoopCommitManager` behind the scenes. To commit an 
offset, you will
 require you to use the `KafkaManualCommit` from Java code such as a Camel 
`Processor`:
 
+._Java-only: uses `KafkaManualCommit` Java API to commit offsets_
 [source,java]
 ----
 public void process(Exchange exchange) {
     KafkaManualCommit manual =
-        exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, 
KafkaManualCommit.class);
+        exchange.getIn().getHeader("CamelKafkaManualCommit", 
KafkaManualCommit.class);
     manual.commit();
 }
 ----
@@ -252,6 +259,7 @@ on the `KafkaComponent` that creates instances of your 
custom implementation.
 When configuring a consumer to use manual commit and a specific 
`CommitManager` it is important to understand how these influence the behavior
 of `breakOnFirstError`
 
+._Java-only: programmatic `KafkaComponent` configuration with manual commit 
factory_
 [source,java]
 ----
 KafkaComponent kafka = new KafkaComponent();
@@ -281,6 +289,7 @@ exception with the message `KafkaConsumer is not safe for 
multi-threaded access`
 The Kafka component supports pausable consumers. This type of consumer can 
pause consuming data based on
 conditions external to the component itself, such as an external system being 
unavailable or other transient conditions.
 
+._Java-only: uses `pausable()` Java DSL with `KafkaConsumerListener`_
 [source,java]
 ----
 from("kafka:topic")
@@ -312,6 +321,10 @@ The following header value types are supported: `String`, 
`Integer`, `Long`, `Do
 Note: all headers propagated *from* kafka *to* camel exchange will contain 
`byte[]` value by default.
 To override default functionality, these uri parameters can be set: 
`headerDeserializer` for `from` route and `headerSerializer` for `to` route. 
For example:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("kafka:my_topic?headerDeserializer=#myDeserializer")
@@ -319,10 +332,43 @@ from("kafka:my_topic?headerDeserializer=#myDeserializer")
 .to("kafka:my_topic?headerSerializer=#mySerializer")
 ----
 
+XML::
++
+[source,xml]
+----
+<route>
+  <from uri="kafka:my_topic?headerDeserializer=#myDeserializer"/>
+  <!-- ... -->
+  <to uri="kafka:my_topic?headerSerializer=#mySerializer"/>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+    from:
+      uri: kafka:my_topic
+      parameters:
+        headerDeserializer: "#myDeserializer"
+      steps:
+        # ...
+        - to:
+            uri: kafka:my_topic
+            parameters:
+              headerSerializer: "#mySerializer"
+----
+====
+
 By default, all headers are being filtered by `KafkaHeaderFilterStrategy`.
 Strategy filters out headers which start with `Camel` or `org.apache.camel` 
prefixes.
 Default strategy can be overridden by using `headerFilterStrategy` uri 
parameter in both `to` and `from` routes:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("kafka:my_topic?headerFilterStrategy=#myStrategy")
@@ -330,6 +376,35 @@ from("kafka:my_topic?headerFilterStrategy=#myStrategy")
 .to("kafka:my_topic?headerFilterStrategy=#myStrategy")
 ----
 
+XML::
++
+[source,xml]
+----
+<route>
+  <from uri="kafka:my_topic?headerFilterStrategy=#myStrategy"/>
+  <!-- ... -->
+  <to uri="kafka:my_topic?headerFilterStrategy=#myStrategy"/>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+    from:
+      uri: kafka:my_topic
+      parameters:
+        headerFilterStrategy: "#myStrategy"
+      steps:
+        # ...
+        - to:
+            uri: kafka:my_topic
+            parameters:
+              headerFilterStrategy: "#myStrategy"
+----
+====
+
 `myStrategy` object should be a subclass of `HeaderFilterStrategy` and must be 
placed in the Camel registry, either manually or by registration as a bean in 
Spring, as it is `CamelContext` aware.
 
 === Kafka Transaction
@@ -383,6 +458,7 @@ If both `transacted=true` and `transactionalId` are 
present, the latter takes pr
 
 Configure the 'krb5.conf' file directly through the API:
 
+._Java-only: static `KafkaComponent` configuration_
 [source,java]
 ----
 static {
@@ -464,16 +540,45 @@ camel.component.kafka.sasl-password=mypassword
 
 *OAuth Authentication Example*
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
-from("kafka:my-topic?brokers=localhost:9092" +
-     "&saslAuthType=OAUTH" +
-     "&oauthClientId=my-client" +
-     "&oauthClientSecret=my-secret" +
-     "&oauthTokenEndpointUri=https://auth.example.com/oauth/token";)
+from("kafka:my-topic?brokers=localhost:9092&saslAuthType=OAUTH&oauthClientId=my-client&oauthClientSecret=my-secret&oauthTokenEndpointUri=https://auth.example.com/oauth/token";)
     .to("log:received");
 ----
 
+XML::
++
+[source,xml]
+----
+<route>
+  <from 
uri="kafka:my-topic?brokers=localhost:9092&amp;saslAuthType=OAUTH&amp;oauthClientId=my-client&amp;oauthClientSecret=my-secret&amp;oauthTokenEndpointUri=https://auth.example.com/oauth/token"/>
+  <to uri="log:received"/>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+    from:
+      uri: kafka:my-topic
+      parameters:
+        brokers: "localhost:9092"
+        saslAuthType: OAUTH
+        oauthClientId: my-client
+        oauthClientSecret: my-secret
+        oauthTokenEndpointUri: "https://auth.example.com/oauth/token";
+      steps:
+        - to:
+            uri: log:received
+----
+====
+
 *AWS MSK IAM Authentication*
 
 When using AWS MSK with IAM authentication, ensure the `aws-msk-iam-auth` 
library is on the classpath:
@@ -600,6 +705,8 @@ By default, Camel uses automatic commits when using batch 
processing. In this ca
 In case of failures, the records will not be processed.
 
 The code below provides an example of this approach:
+
+._Java-only: batch processing with inline `Processor` to iterate over 
exchanges_
 [source,java]
 ----
 public void configure() {
@@ -630,6 +737,7 @@ It is recommended to implement appropriate error handling 
mechanisms and pattern
 
 The code below provides an example of handling errors with automatic commits:
 
+._Java-only: `onException` and batch processing with inline `Processor`_
 [source,java]
 ----
 public void configure() {
@@ -775,7 +883,7 @@ onException(Exception.class)
     .process(exchange -> {
         // Commit manually when error occurs
         KafkaManualCommit manual = exchange.getMessage()
-            .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+            .getHeader("CamelKafkaManualCommit", KafkaManualCommit.class);
         manual.commit();
     });
 
@@ -790,7 +898,7 @@ 
from("kafka:topic?groupId=myGroup&batching=true&breakOnFirstError=true&autoCommi
     .process(exchange -> {
         // Manual commit on successful processing
         KafkaManualCommit manual = exchange.getMessage()
-            .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+            .getHeader("CamelKafkaManualCommit", KafkaManualCommit.class);
         manual.commit();
     })
     .to("mock:result");
@@ -870,7 +978,7 @@ public class ErrorCommitProcessor implements Processor {
         LOG.warn("Error occurred, performing manual commit before 
reconnection");
 
         KafkaManualCommit manual = exchange.getMessage()
-            .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+            .getHeader("CamelKafkaManualCommit", KafkaManualCommit.class);
 
         if (manual != null) {
             manual.commit();
@@ -888,7 +996,7 @@ public class SuccessCommitProcessor implements Processor {
         LOG.debug("Batch processed successfully, performing manual commit");
 
         KafkaManualCommit manual = exchange.getMessage()
-            .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+            .getHeader("CamelKafkaManualCommit", KafkaManualCommit.class);
 
         if (manual != null) {
             manual.commit();
@@ -905,6 +1013,7 @@ When working with batch processing with manual commits, 
it's up to the applicati
 
 The code below provides an example of this approach:
 
+._Java-only: batch processing with `KafkaManualCommit` to commit the whole 
batch_
 [source,java]
 ----
 public void configure() {
@@ -925,7 +1034,7 @@ public void configure() {
         final Object tmp = exchanges.getLast();
         if (tmp instanceof Exchange exchange) {
             KafkaManualCommit manual =
-                    
exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, 
KafkaManualCommit.class);
+                    exchange.getMessage().getHeader("CamelKafkaManualCommit", 
KafkaManualCommit.class);
             LOG.debug("Performing manual commit");
             manual.commit();
             LOG.debug("Done performing manual commit");
@@ -942,6 +1051,7 @@ To properly do so, first make sure to have a max polling 
interval that is higher
 
 Then, increase the shutdown timeout to ensure that committing, closing and 
other Kafka operations are not abruptly aborted. For instance:
 
+._Java-only: programmatic shutdown strategy configuration_
 [source,java]
 ----
 public void configure() {
@@ -957,6 +1067,7 @@ public void configure() {
 Applications with complex subscription logic may provide a custom bean to 
handle the subscription process. To so, it is
 necessary to implement the interface `SubscribeAdapter`.
 
+._Java-only: custom `SubscribeAdapter` implementation_
 [source,java]
 .Example subscriber adapter that subscribes to a set of Kafka topics or 
patterns
 ----
@@ -974,10 +1085,11 @@ public class CustomSubscribeAdapter implements 
SubscribeAdapter {
 
 Then, it is necessary to add it as named bean instance to the registry:
 
+._Java-only: programmatic registry binding_
 [source,java]
 .Add to registry example
 ----
-context.getRegistry().bind(KafkaConstants.KAFKA_SUBSCRIBE_ADAPTER, new 
CustomSubscribeAdapter());
+context.getRegistry().bind("subscribeAdapter", new CustomSubscribeAdapter());
 ----
 
 === Interoperability
@@ -1002,17 +1114,46 @@ To utilize this solution, you need to modify the route 
URI on the consumer end o
 `headerDeserializer` option.
 For example:
 
+[tabs]
+====
+Java::
++
 [source,java]
-.Route snippet
 ----
 
from("kafka:topic?headerDeserializer=#class:org.apache.camel.component.kafka.consumer.support.interop.JMSDeserializer")
     .to("...");
 ----
 
+XML::
++
+[source,xml]
+----
+<route>
+  <from 
uri="kafka:topic?headerDeserializer=#class:org.apache.camel.component.kafka.consumer.support.interop.JMSDeserializer"/>
+  <to uri="..."/>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+    from:
+      uri: kafka:topic
+      parameters:
+        headerDeserializer: 
"#class:org.apache.camel.component.kafka.consumer.support.interop.JMSDeserializer"
+      steps:
+        - to:
+            uri: "..."
+----
+====
+
 === Producer Performance
 
 If the producer is performing too slowly for your needs, you may want to 
aggregate the exchanges before sending.
 
+._Java-only: uses `GroupedExchangeAggregationStrategy` which requires Java_
 [source,java]
 .Route snippet
 ----
@@ -1183,6 +1324,7 @@ To keep the offsets, the component needs a 
`StateRepository` implementation such
 This bean should be available in the registry.
 Here how to use it :
 
+._Java-only: programmatic `FileStateRepository` setup and registry binding_
 [source,java]
 ----
 // Create the repository in which the Kafka offsets will be persisted
@@ -1215,33 +1357,103 @@ camelContext.addRoutes(new RouteBuilder() {
 
 Here is the minimal route you need to produce messages to Kafka.
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
 from("direct:start")
-    .setBody(constant("Message from Camel"))          // Message to send
-    .setHeader(KafkaConstants.KEY, constant("Camel")) // Key of the message
+    .setBody(constant("Message from Camel"))
+    .setHeader("CamelKafkaKey", constant("Camel"))
     .to("kafka:test?brokers=localhost:9092");
 ----
 
+XML::
++
+[source,xml]
+----
+<route>
+  <from uri="direct:start"/>
+  <setBody>
+    <constant>Message from Camel</constant>
+  </setBody>
+  <setHeader name="CamelKafkaKey">
+    <constant>Camel</constant>
+  </setHeader>
+  <to uri="kafka:test?brokers=localhost:9092"/>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+    from:
+      uri: direct:start
+      steps:
+        - setBody:
+            constant: "Message from Camel"
+        - setHeader:
+            name: CamelKafkaKey
+            constant: "Camel"
+        - to:
+            uri: kafka:test
+            parameters:
+              brokers: "localhost:9092"
+----
+====
+
 === SSL configuration
 
 You have two different ways to configure the SSL communication on the Kafka 
component.
 
 The first way is through the many SSL endpoint parameters:
 
+[tabs]
+====
+Java::
++
 [source,java]
 ----
-from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" +
-             "&groupId=A" +
-             "&sslKeystoreLocation=/path/to/keystore.jks" +
-             "&sslKeystorePassword=changeit" +
-             "&sslKeyPassword=changeit" +
-             "&securityProtocol=SSL")
-        .to("mock:result");
+from("kafka:myTopic?brokers=localhost:9092&groupId=A&sslKeystoreLocation=/path/to/keystore.jks&sslKeystorePassword=changeit&sslKeyPassword=changeit&securityProtocol=SSL")
+    .to("mock:result");
 ----
 
+XML::
++
+[source,xml]
+----
+<route>
+  <from 
uri="kafka:myTopic?brokers=localhost:9092&amp;groupId=A&amp;sslKeystoreLocation=/path/to/keystore.jks&amp;sslKeystorePassword=changeit&amp;sslKeyPassword=changeit&amp;securityProtocol=SSL"/>
+  <to uri="mock:result"/>
+</route>
+----
+
+YAML::
++
+[source,yaml]
+----
+- route:
+    from:
+      uri: kafka:myTopic
+      parameters:
+        brokers: "localhost:9092"
+        groupId: A
+        sslKeystoreLocation: /path/to/keystore.jks
+        sslKeystorePassword: changeit
+        sslKeyPassword: changeit
+        securityProtocol: SSL
+      steps:
+        - to:
+            uri: mock:result
+----
+====
+
 The second way is to use the `sslContextParameters` endpoint parameter:
 
+._Java-only: programmatic `SSLContextParameters` setup and registry binding_
 [source,java]
 ----
 // Configure the SSLContextParameters object

Reply via email to