This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch feature/CAMEL-23789-kafka-multi-dsl-docs in repository https://gitbox.apache.org/repos/asf/camel.git
commit 52d257351ddbebe6d1a18e26f94bf91c068168ed Author: Claus Ibsen <[email protected]> AuthorDate: Wed Jun 17 18:02:06 2026 +0200 CAMEL-23789: Improve camel-kafka docs with multi-DSL tabs and Java-only markers Co-Authored-By: Claude <[email protected]> Signed-off-by: Claus Ibsen <[email protected]> --- .../camel-kafka/src/main/docs/kafka-component.adoc | 608 ++++++++++++++++----- 1 file changed, 467 insertions(+), 141 deletions(-) diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index cbc11a29d825..e4a2d1e3e3a5 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -83,14 +83,23 @@ How Camel handles a message that results in an exception can be altered using th Instead of continuing to poll the next message, Camel will instead commit the offset so that the message that caused the exception will be retried. This is similar to the *RETRY* polling strategy above. +You can configure this on the component level, either programmatically in Java, or via configuration properties: + +._Java-only: programmatic component configuration_ [source,java] ---- KafkaComponent kafka = new KafkaComponent(); kafka.setBreakOnFirstError(true); -... camelContext.addComponent("kafka", kafka); ---- +Or using configuration properties: + +[source,properties] +---- +camel.component.kafka.break-on-first-error=true +---- + It is recommended that you read the section below "Using manual commit with Kafka consumer" to understand how `breakOnFirstError` will work based on the `CommitManager` that is configured. @@ -122,23 +131,18 @@ To use, this repository must be placed in the Camel registry, either manually or Sample usage is as follows: +NOTE: The `KafkaIdempotentRepository` bean must be registered in the Camel registry before it can be referenced by the route. + +._Java-only: registering the bean_ [source,java] ---- KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository("idempotent-db-inserts", "localhost:9091"); SimpleRegistry registry = new SimpleRegistry(); -registry.put("insertDbIdemRepo", kafkaIdempotentRepository); // must be registered in the registry, to enable access to the CamelContext -CamelContext context = new CamelContext(registry); - -// later in RouteBuilder... -from("direct:performInsert") - .idempotentConsumer(header("id")).idempotentRepository("insertDbIdemRepo") - // once-only insert into the database - .end() +registry.put("insertDbIdemRepo", kafkaIdempotentRepository); ---- -In XML: - +._XML: registering the bean_ [source,xml] ---- <!-- simple --> @@ -166,18 +170,63 @@ In XML: </bean> ---- +[tabs] +==== +Java:: ++ +[source,java] +---- +from("direct:performInsert") + .idempotentConsumer(header("id")).idempotentRepository("insertDbIdemRepo") + .to("sql:INSERT INTO ...") + .end(); +---- + +XML:: ++ +[source,xml] +---- +<route> + <from uri="direct:performInsert"/> + <idempotentConsumer idempotentRepository="insertDbIdemRepo"> + <header>id</header> + <to uri="sql:INSERT INTO ..."/> + </idempotentConsumer> +</route> +---- + +YAML:: ++ +[source,yaml] +---- +- route: + from: + uri: direct:performInsert + steps: + - idempotentConsumer: + idempotentRepository: "#insertDbIdemRepo" + expression: + header: id + steps: + - to: + uri: "sql:INSERT INTO ..." +---- +==== + There are 3 alternatives to choose from when using idempotency with numeric identifiers. The first one is to use the static method `numericHeader` method from `org.apache.camel.component.kafka.serde.KafkaSerdeHelper` to perform the conversion for you: +._Java-only: numericHeader helper method_ [source,java] ---- from("direct:performInsert") .idempotentConsumer(numericHeader("id")).idempotentRepository("insertDbIdemRepo") - // once-only insert into the database - .end() + .to("sql:INSERT INTO ...") + .end(); ---- Alternatively, it is possible to use a custom serializer configured via the route URL to perform the conversion: +._Java-only: custom header deserializer class_ [source,java] ---- public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer { @@ -198,18 +247,18 @@ public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer { Lastly, it is also possible to do so in a processor: +._Java-only: processor for header type conversion_ [source,java] ---- -from(from).routeId("foo") +from("kafka:my-topic") .process(exchange -> { byte[] id = exchange.getIn().getHeader("id", byte[].class); - BigInteger bi = new BigInteger(id); exchange.getIn().setHeader("id", String.valueOf(bi.longValue())); }) .idempotentConsumer(header("id")) .idempotentRepository("kafkaIdempotentRepository") - .to(to); + .to("direct:process"); ---- === Manual commits with the Kafka consumer @@ -220,27 +269,38 @@ In case you want to force manual commits, you can use `KafkaManualCommit` API fr This requires turning on manual commits by either setting the option `allowManualCommit` to `true` on the `KafkaComponent` or on the endpoint, for example: +._Java-only: programmatic component configuration_ [source,java] ---- KafkaComponent kafka = new KafkaComponent(); kafka.setAutoCommitEnable(false); kafka.setAllowManualCommit(true); -// ... camelContext.addComponent("kafka", kafka); ---- +Or using configuration properties: + +[source,properties] +---- +camel.component.kafka.auto-commit-enable=false +camel.component.kafka.allow-manual-commit=true +---- + By default, it uses the `NoopCommitManager` behind the scenes. To commit an offset, you will -require you to use the `KafkaManualCommit` from Java code such as a Camel `Processor`: +need to use the `KafkaManualCommit` from Java code such as a Camel `Processor`: +._Java-only: manual commit processor_ [source,java] ---- public void process(Exchange exchange) { KafkaManualCommit manual = - exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + exchange.getIn().getHeader("CamelKafkaManualCommit", KafkaManualCommit.class); manual.commit(); } ---- +TIP: The header name `CamelKafkaManualCommit` is also available as the constant `KafkaConstants.MANUAL_COMMIT`. + The `KafkaManualCommit` will force a synchronous commit which will block until the commit is acknowledged on Kafka, or if it fails an exception is thrown. You can use an asynchronous commit as well by configuring the `KafkaManualCommitFactory` with the `DefaultKafkaManualAsyncCommitFactory` implementation. @@ -252,6 +312,7 @@ on the `KafkaComponent` that creates instances of your custom implementation. When configuring a consumer to use manual commit and a specific `CommitManager` it is important to understand how these influence the behavior of `breakOnFirstError` +._Java-only: programmatic component configuration_ [source,java] ---- KafkaComponent kafka = new KafkaComponent(); @@ -259,10 +320,19 @@ kafka.setAutoCommitEnable(false); kafka.setAllowManualCommit(true); kafka.setBreakOnFirstError(true); kafka.setKafkaManualCommitFactory(new DefaultKafkaManualCommitFactory()); -... camelContext.addComponent("kafka", kafka); ---- +Or using configuration properties: + +[source,properties] +---- +camel.component.kafka.auto-commit-enable=false +camel.component.kafka.allow-manual-commit=true +camel.component.kafka.break-on-first-error=true +camel.component.kafka.kafka-manual-commit-factory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory +---- + When the `CommitManager` is left to the default `NoopCommitManager` then `breakOnFirstError` will not automatically commit the offset so that the message with an error is retried. The consumer must manage that in the route using `KafkaManualCommit`. @@ -281,13 +351,14 @@ exception with the message `KafkaConsumer is not safe for multi-threaded access` The Kafka component supports pausable consumers. This type of consumer can pause consuming data based on conditions external to the component itself, such as an external system being unavailable or other transient conditions. +._Java-only: pausable consumer with lambda_ [source,java] ---- from("kafka:topic") - .pausable(new KafkaConsumerListener(), () -> canContinue()) // the pausable check gets called if the exchange fails to be processed ... + .pausable(new KafkaConsumerListener(), () -> canContinue()) .routeId("pausable-route") - .process(this::process) // Kafka consumer will be paused if this one throws an exception ... - .to("some:destination"); // or this one + .process(this::process) + .to("some:destination"); ---- In this example, consuming messages can pause (by calling the Kafka's Consumer pause method) if the result from `canContinue` is false. @@ -312,24 +383,84 @@ The following header value types are supported: `String`, `Integer`, `Long`, `Do Note: all headers propagated *from* kafka *to* camel exchange will contain `byte[]` value by default. To override default functionality, these uri parameters can be set: `headerDeserializer` for `from` route and `headerSerializer` for `to` route. For example: +[tabs] +==== +Java:: ++ [source,java] ---- from("kafka:my_topic?headerDeserializer=#myDeserializer") -... -.to("kafka:my_topic?headerSerializer=#mySerializer") + .to("kafka:my_topic?headerSerializer=#mySerializer"); ---- +XML:: ++ +[source,xml] +---- +<route> + <from uri="kafka:my_topic?headerDeserializer=#myDeserializer"/> + <to uri="kafka:my_topic?headerSerializer=#mySerializer"/> +</route> +---- + +YAML:: ++ +[source,yaml] +---- +- route: + from: + uri: kafka:my_topic + parameters: + headerDeserializer: "#myDeserializer" + steps: + - to: + uri: kafka:my_topic + parameters: + headerSerializer: "#mySerializer" +---- +==== + By default, all headers are being filtered by `KafkaHeaderFilterStrategy`. Strategy filters out headers which start with `Camel` or `org.apache.camel` prefixes. Default strategy can be overridden by using `headerFilterStrategy` uri parameter in both `to` and `from` routes: +[tabs] +==== +Java:: ++ [source,java] ---- from("kafka:my_topic?headerFilterStrategy=#myStrategy") -... -.to("kafka:my_topic?headerFilterStrategy=#myStrategy") + .to("kafka:my_topic?headerFilterStrategy=#myStrategy"); ---- +XML:: ++ +[source,xml] +---- +<route> + <from uri="kafka:my_topic?headerFilterStrategy=#myStrategy"/> + <to uri="kafka:my_topic?headerFilterStrategy=#myStrategy"/> +</route> +---- + +YAML:: ++ +[source,yaml] +---- +- route: + from: + uri: kafka:my_topic + parameters: + headerFilterStrategy: "#myStrategy" + steps: + - to: + uri: kafka:my_topic + parameters: + headerFilterStrategy: "#myStrategy" +---- +==== + `myStrategy` object should be a subclass of `HeaderFilterStrategy` and must be placed in the Camel registry, either manually or by registration as a bean in Spring, as it is `CamelContext` aware. === Kafka Transaction @@ -383,6 +514,7 @@ If both `transacted=true` and `transactionalId` are present, the latter takes pr Configure the 'krb5.conf' file directly through the API: +._Java-only: static Kerberos configuration_ [source,java] ---- static { @@ -390,6 +522,13 @@ static { } ---- +Alternatively, you can set the JVM system property: + +[source,properties] +---- +java.security.krb5.conf=/path/to/config/file +---- + === Authentication to Kafka Kafka supports several ways to authenticate the clients to the server, including plain text, PKI (certificates) over TLS, you can refer to the https://kafka.apache.org/documentation/#security_sasl[Kafka documentation] for a detailed view of the supported mechanisms. The kafka authentication and authorization is based on JAAS, so you must use a JAAS Login Module implementation on the client side. @@ -464,16 +603,45 @@ camel.component.kafka.sasl-password=mypassword *OAuth Authentication Example* +[tabs] +==== +Java:: ++ [source,java] ---- -from("kafka:my-topic?brokers=localhost:9092" + - "&saslAuthType=OAUTH" + - "&oauthClientId=my-client" + - "&oauthClientSecret=my-secret" + - "&oauthTokenEndpointUri=https://auth.example.com/oauth/token") +from("kafka:my-topic?brokers=localhost:9092&saslAuthType=OAUTH&oauthClientId=my-client&oauthClientSecret=my-secret&oauthTokenEndpointUri=https://auth.example.com/oauth/token") .to("log:received"); ---- +XML:: ++ +[source,xml] +---- +<route> + <from uri="kafka:my-topic?brokers=localhost:9092&saslAuthType=OAUTH&oauthClientId=my-client&oauthClientSecret=my-secret&oauthTokenEndpointUri=https://auth.example.com/oauth/token"/> + <to uri="log:received"/> +</route> +---- + +YAML:: ++ +[source,yaml] +---- +- route: + from: + uri: kafka:my-topic + parameters: + brokers: "localhost:9092" + saslAuthType: OAUTH + oauthClientId: my-client + oauthClientSecret: my-secret + oauthTokenEndpointUri: "https://auth.example.com/oauth/token" + steps: + - to: + uri: log:received +---- +==== + *AWS MSK IAM Authentication* When using AWS MSK with IAM authentication, ensure the `aws-msk-iam-auth` library is on the classpath: @@ -600,26 +768,23 @@ By default, Camel uses automatic commits when using batch processing. In this ca In case of failures, the records will not be processed. The code below provides an example of this approach: + +._Java-only: batch processing with inline processor_ [source,java] ---- -public void configure() { - from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e -> { - // The received records are stored as exchanges in a list. This gets the list of those exchanges +from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest") + .process(e -> { final List<?> exchanges = e.getMessage().getBody(List.class); - - // Ensure we are actually receiving what we are asking for if (exchanges == null || exchanges.isEmpty()) { return; } - - // The records from the batch are stored in a list of exchanges in the original exchange. To process, we iterate over that list for (Object obj : exchanges) { if (obj instanceof Exchange exchange) { LOG.info("Processing exchange with body {}", exchange.getMessage().getBody(String.class)); } } - }).to(KafkaTestUtil.MOCK_RESULT); -} + }) + .to("mock:result"); ---- ===== Handling Errors with Automatic Commits @@ -630,44 +795,31 @@ It is recommended to implement appropriate error handling mechanisms and pattern The code below provides an example of handling errors with automatic commits: +._Java-only: error handling with batch processing_ [source,java] ---- -public void configure() { - /* - We want to use continued here, so that Camel auto-commits the batch even though part of it has failed. In a - production scenario, applications should probably send these records to a separate topic or fix the condition - that lead to the failure - */ - onException(IllegalArgumentException.class).process(exchange -> { - LOG.warn("Failed to process batch {}", exchange.getMessage().getBody()); - LOG.warn("Failed to process due to {}", exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class).getMessage()); - }).continued(true); - - from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest").process(e -> { - // The received records are stored as exchanges in a list. This gets the list of those exchanges - final List<?> exchanges = e.getMessage().getBody(List.class); +onException(IllegalArgumentException.class).process(exchange -> { + LOG.warn("Failed to process batch {}", exchange.getMessage().getBody()); + LOG.warn("Failed to process due to {}", exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Throwable.class).getMessage()); +}).continued(true); - // Ensure we are actually receiving what we are asking for +from("kafka:topic?groupId=myGroup&pollTimeoutMs=1000&batching=true&maxPollRecords=10&autoOffsetReset=earliest") + .process(e -> { + final List<?> exchanges = e.getMessage().getBody(List.class); if (exchanges == null || exchanges.isEmpty()) { return; } - - // The records from the batch are stored in a list of exchanges in the original exchange. - int i = 0; for (Object o : exchanges) { if (o instanceof Exchange exchange) { - i++; LOG.info("Processing exchange with body {}", exchange.getMessage().getBody(String.class)); - - if (i == 4) { - throw new IllegalArgumentException("Failed to process record"); - } } } - }).to(KafkaTestUtil.MOCK_RESULT); -} + }) + .to("mock:result"); ---- +TIP: In a production scenario, applications should send failed records to a separate topic (dead-letter queue) or fix the condition that led to the failure. + ===== Break on First Error in Batching Mode The `breakOnFirstError` option is also supported in batching mode, providing the same error handling behavior as in streaming mode but applied to batch processing. @@ -775,7 +927,7 @@ onException(Exception.class) .process(exchange -> { // Commit manually when error occurs KafkaManualCommit manual = exchange.getMessage() - .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + .getHeader("CamelKafkaManualCommit", KafkaManualCommit.class); manual.commit(); }); @@ -790,7 +942,7 @@ from("kafka:topic?groupId=myGroup&batching=true&breakOnFirstError=true&autoCommi .process(exchange -> { // Manual commit on successful processing KafkaManualCommit manual = exchange.getMessage() - .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + .getHeader("CamelKafkaManualCommit", KafkaManualCommit.class); manual.commit(); }) .to("mock:result"); @@ -870,7 +1022,7 @@ public class ErrorCommitProcessor implements Processor { LOG.warn("Error occurred, performing manual commit before reconnection"); KafkaManualCommit manual = exchange.getMessage() - .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + .getHeader("CamelKafkaManualCommit", KafkaManualCommit.class); if (manual != null) { manual.commit(); @@ -888,7 +1040,7 @@ public class SuccessCommitProcessor implements Processor { LOG.debug("Batch processed successfully, performing manual commit"); KafkaManualCommit manual = exchange.getMessage() - .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + .getHeader("CamelKafkaManualCommit", KafkaManualCommit.class); if (manual != null) { manual.commit(); @@ -905,33 +1057,25 @@ When working with batch processing with manual commits, it's up to the applicati The code below provides an example of this approach: +._Java-only: batch processing with manual commits_ [source,java] ---- -public void configure() { - from("kafka:topic?batching=true&allowManualCommit=true&maxPollRecords=100&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory") +from("kafka:topic?batching=true&allowManualCommit=true&maxPollRecords=100&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory") .process(e -> { - // The received records are stored as exchanges in a list. This gets the list of those exchanges final List<?> exchanges = e.getMessage().getBody(List.class); - - // Ensure we are actually receiving what we are asking for if (exchanges == null || exchanges.isEmpty()) { return; } - - /* - Every exchange in that list should contain a reference to the manual commit object. We use the reference - for the last exchange in the list to commit the whole batch - */ + // Use the last exchange in the list to commit the whole batch final Object tmp = exchanges.getLast(); if (tmp instanceof Exchange exchange) { KafkaManualCommit manual = - exchange.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class); + exchange.getMessage().getHeader("CamelKafkaManualCommit", KafkaManualCommit.class); LOG.debug("Performing manual commit"); manual.commit(); LOG.debug("Done performing manual commit"); } }); -} ---- ==== Dealing with long polling timeouts @@ -942,14 +1086,17 @@ To properly do so, first make sure to have a max polling interval that is higher Then, increase the shutdown timeout to ensure that committing, closing and other Kafka operations are not abruptly aborted. For instance: +._Java-only: programmatic shutdown timeout configuration_ [source,java] ---- -public void configure() { - // Note that this can be configured in other ways - getCamelContext().getShutdownStrategy().setTimeout(10000); +getCamelContext().getShutdownStrategy().setTimeout(10000); +---- - // route setup ... -} +Or using configuration properties: + +[source,properties] +---- +camel.main.shutdown-timeout=10000 ---- === Custom Subscription Adapters @@ -957,8 +1104,8 @@ public void configure() { Applications with complex subscription logic may provide a custom bean to handle the subscription process. To so, it is necessary to implement the interface `SubscribeAdapter`. +._Java-only: custom subscribe adapter class_ [source,java] -.Example subscriber adapter that subscribes to a set of Kafka topics or patterns ---- public class CustomSubscribeAdapter implements SubscribeAdapter { @Override @@ -972,12 +1119,12 @@ public class CustomSubscribeAdapter implements SubscribeAdapter { } ---- -Then, it is necessary to add it as named bean instance to the registry: +Then, it is necessary to add it as a named bean instance to the registry: +._Java-only: registering the bean_ [source,java] -.Add to registry example ---- -context.getRegistry().bind(KafkaConstants.KAFKA_SUBSCRIBE_ADAPTER, new CustomSubscribeAdapter()); +context.getRegistry().bind("subscribeAdapter", new CustomSubscribeAdapter()); ---- === Interoperability @@ -1002,25 +1149,90 @@ To utilize this solution, you need to modify the route URI on the consumer end o `headerDeserializer` option. For example: +[tabs] +==== +Java:: ++ [source,java] -.Route snippet ---- from("kafka:topic?headerDeserializer=#class:org.apache.camel.component.kafka.consumer.support.interop.JMSDeserializer") - .to("..."); + .to("direct:process"); ---- +XML:: ++ +[source,xml] +---- +<route> + <from uri="kafka:topic?headerDeserializer=#class:org.apache.camel.component.kafka.consumer.support.interop.JMSDeserializer"/> + <to uri="direct:process"/> +</route> +---- + +YAML:: ++ +[source,yaml] +---- +- route: + from: + uri: kafka:topic + parameters: + headerDeserializer: "#class:org.apache.camel.component.kafka.consumer.support.interop.JMSDeserializer" + steps: + - to: + uri: direct:process +---- +==== + === Producer Performance If the producer is performing too slowly for your needs, you may want to aggregate the exchanges before sending. +[tabs] +==== +Java:: ++ [source,java] -.Route snippet ---- -from("source") - // .other route stuff +from("direct:start") .aggregate(constant(true), new GroupedExchangeAggregationStrategy()) - .to("kafka:topic"); + .to("kafka:my-topic"); +---- + +XML:: ++ +[source,xml] +---- +<route> + <from uri="direct:start"/> + <aggregate aggregationStrategy="#groupedExchange"> + <correlationExpression> + <constant>true</constant> + </correlationExpression> + <to uri="kafka:my-topic"/> + </aggregate> +</route> +---- + +YAML:: ++ +[source,yaml] +---- +- route: + from: + uri: direct:start + steps: + - aggregate: + aggregationStrategy: "#groupedExchange" + correlationExpression: + constant: "true" + steps: + - to: + uri: kafka:my-topic ---- +==== + +TIP: In XML and YAML, register the `GroupedExchangeAggregationStrategy` as a bean named `groupedExchange` in the registry. The reason for this is related to how the producer handles the two different cases: @@ -1183,68 +1395,162 @@ To keep the offsets, the component needs a `StateRepository` implementation such This bean should be available in the registry. Here how to use it : +NOTE: The `FileStateRepository` bean must be registered in the Camel registry before it can be referenced by the route. + +._Java-only: registering the bean_ [source,java] ---- -// Create the repository in which the Kafka offsets will be persisted FileStateRepository repository = FileStateRepository.fileStateRepository(new File("/path/to/repo.dat")); // Bind this repository into the Camel registry Registry registry = createCamelRegistry(); registry.bind("offsetRepo", repository); +---- -// Configure the camel context -DefaultCamelContext camelContext = new DefaultCamelContext(registry); -camelContext.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - fromF("kafka:%s?brokers=localhost:{{kafkaPort}}" + - // Set up the topic and broker address - "&groupId=A" + - // The consumer processor group ID - "&autoOffsetReset=earliest" + - // Ask to start from the beginning if we have unknown offset - "&offsetRepository=#offsetRepo", TOPIC) - // Keep the offsets in the previously configured repository - .to("mock:result"); - } -}); +[tabs] +==== +Java:: ++ +[source,java] +---- +from("kafka:my-topic?brokers=localhost:9092&groupId=A&autoOffsetReset=earliest&offsetRepository=#offsetRepo") + .to("mock:result"); ---- +XML:: ++ +[source,xml] +---- +<route> + <from uri="kafka:my-topic?brokers=localhost:9092&groupId=A&autoOffsetReset=earliest&offsetRepository=#offsetRepo"/> + <to uri="mock:result"/> +</route> +---- + +YAML:: ++ +[source,yaml] +---- +- route: + from: + uri: kafka:my-topic + parameters: + brokers: "localhost:9092" + groupId: A + autoOffsetReset: earliest + offsetRepository: "#offsetRepo" + steps: + - to: + uri: mock:result +---- +==== + === Producing messages to Kafka Here is the minimal route you need to produce messages to Kafka. +[tabs] +==== +Java:: ++ [source,java] ---- from("direct:start") - .setBody(constant("Message from Camel")) // Message to send - .setHeader(KafkaConstants.KEY, constant("Camel")) // Key of the message + .setBody(constant("Message from Camel")) + .setHeader("CamelKafkaKey", constant("Camel")) .to("kafka:test?brokers=localhost:9092"); ---- +XML:: ++ +[source,xml] +---- +<route> + <from uri="direct:start"/> + <setBody> + <constant>Message from Camel</constant> + </setBody> + <setHeader name="CamelKafkaKey"> + <constant>Camel</constant> + </setHeader> + <to uri="kafka:test?brokers=localhost:9092"/> +</route> +---- + +YAML:: ++ +[source,yaml] +---- +- route: + from: + uri: direct:start + steps: + - setBody: + constant: "Message from Camel" + - setHeader: + name: CamelKafkaKey + constant: "Camel" + - to: + uri: kafka:test + parameters: + brokers: "localhost:9092" +---- +==== + +TIP: In Java, you can use the constant `KafkaConstants.KEY` instead of the string `"CamelKafkaKey"`. + === SSL configuration You have two different ways to configure the SSL communication on the Kafka component. The first way is through the many SSL endpoint parameters: +[tabs] +==== +Java:: ++ [source,java] ---- -from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" + - "&groupId=A" + - "&sslKeystoreLocation=/path/to/keystore.jks" + - "&sslKeystorePassword=changeit" + - "&sslKeyPassword=changeit" + - "&securityProtocol=SSL") - .to("mock:result"); +from("kafka:my-topic?brokers=localhost:9092&groupId=A&sslKeystoreLocation=/path/to/keystore.jks&sslKeystorePassword=changeit&sslKeyPassword=changeit&securityProtocol=SSL") + .to("mock:result"); +---- + +XML:: ++ +[source,xml] +---- +<route> + <from uri="kafka:my-topic?brokers=localhost:9092&groupId=A&sslKeystoreLocation=/path/to/keystore.jks&sslKeystorePassword=changeit&sslKeyPassword=changeit&securityProtocol=SSL"/> + <to uri="mock:result"/> +</route> ---- +YAML:: ++ +[source,yaml] +---- +- route: + from: + uri: kafka:my-topic + parameters: + brokers: "localhost:9092" + groupId: A + sslKeystoreLocation: "/path/to/keystore.jks" + sslKeystorePassword: changeit + sslKeyPassword: changeit + securityProtocol: SSL + steps: + - to: + uri: mock:result +---- +==== + The second way is to use the `sslContextParameters` endpoint parameter: +._Java-only: SSLContextParameters bean setup_ [source,java] ---- -// Configure the SSLContextParameters object KeyStoreParameters ksp = new KeyStoreParameters(); ksp.setResource("/path/to/keystore.jks"); ksp.setPassword("changeit"); @@ -1254,26 +1560,46 @@ kmp.setKeyPassword("changeit"); SSLContextParameters scp = new SSLContextParameters(); scp.setKeyManagers(kmp); -// Bind this SSLContextParameters into the Camel registry Registry registry = createCamelRegistry(); registry.bind("ssl", scp); +---- -// Configure the camel context -DefaultCamelContext camelContext = new DefaultCamelContext(registry); -camelContext.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("kafka:" + TOPIC + "?brokers=localhost:{{kafkaPort}}" + - // Set up the topic and broker address - "&groupId=A" + - // The consumer processor group ID - "&sslContextParameters=#ssl" + - // The security protocol - "&securityProtocol=SSL) - // Reference the SSL configuration - .to("mock:result"); - } -}); +[tabs] +==== +Java:: ++ +[source,java] +---- +from("kafka:my-topic?brokers=localhost:9092&groupId=A&sslContextParameters=#ssl&securityProtocol=SSL") + .to("mock:result"); +---- + +XML:: ++ +[source,xml] +---- +<route> + <from uri="kafka:my-topic?brokers=localhost:9092&groupId=A&sslContextParameters=#ssl&securityProtocol=SSL"/> + <to uri="mock:result"/> +</route> +---- + +YAML:: ++ +[source,yaml] +---- +- route: + from: + uri: kafka:my-topic + parameters: + brokers: "localhost:9092" + groupId: A + sslContextParameters: "#ssl" + securityProtocol: SSL + steps: + - to: + uri: mock:result ---- +==== include::spring-boot:partial$starter.adoc[]
