This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new cf80392c9b5 Enhance camel-kafka documentation with Strimzi Oauth. (#16838) cf80392c9b5 is described below commit cf80392c9b5483f60a50d5399b3712e667a9650b Author: Claudio Miranda <clau...@claudius.com.br> AuthorDate: Thu Jan 16 15:09:26 2025 -0300 Enhance camel-kafka documentation with Strimzi Oauth. (#16838) --- .../org/apache/camel/catalog/components/kafka.json | 4 +- .../org/apache/camel/component/kafka/kafka.json | 4 +- .../camel-kafka/src/main/docs/kafka-component.adoc | 58 ++++++++++++++++++++-- .../camel/component/kafka/KafkaConfiguration.java | 5 +- .../dsl/KafkaComponentBuilderFactory.java | 2 +- .../endpoint/dsl/KafkaEndpointBuilderFactory.java | 12 ++--- 6 files changed, 70 insertions(+), 15 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json index 31ccaf7f277..f425fe13671 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/kafka.json @@ -24,7 +24,7 @@ "remote": true }, "componentProperties": { - "additionalProperties": { "index": 0, "kind": "property", "displayName": "Additional Properties", "group": "common", "label": "common", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "prefix": "additionalProperties.", "multiValue": true, "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets [...] + "additionalProperties": { "index": 0, "kind": "property", "displayName": "Additional Properties", "group": "common", "label": "common", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "prefix": "additionalProperties.", "multiValue": true, "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets [...] "brokers": { "index": 1, "kind": "property", "displayName": "Brokers", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "URL of the Kafka brokers to use. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP [...] "clientId": { "index": 2, "kind": "property", "displayName": "Client Id", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The client id is a user-specified string sent in each request to help trace calls. It should logically identify the a [...] "configuration": { "index": 3, "kind": "property", "displayName": "Configuration", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaConfiguration", "deprecated": false, "autowired": false, "secret": false, "description": "Allows to pre-configure the Kafka component with common options that the endpoints will reuse." }, @@ -158,7 +158,7 @@ }, "properties": { "topic": { "index": 0, "kind": "path", "displayName": "Topic", "group": "common", "label": "common", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Name of the topic to use. On the consumer you can use comma to separate multiple topics. A producer can on [...] - "additionalProperties": { "index": 1, "kind": "parameter", "displayName": "Additional Properties", "group": "common", "label": "common", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "prefix": "additionalProperties.", "multiValue": true, "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets [...] + "additionalProperties": { "index": 1, "kind": "parameter", "displayName": "Additional Properties", "group": "common", "label": "common", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "prefix": "additionalProperties.", "multiValue": true, "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets [...] "brokers": { "index": 2, "kind": "parameter", "displayName": "Brokers", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "URL of the Kafka brokers to use. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VI [...] "clientId": { "index": 3, "kind": "parameter", "displayName": "Client Id", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The client id is a user-specified string sent in each request to help trace calls. It should logically identify the [...] "headerFilterStrategy": { "index": 4, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "common", "label": "common", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "To use a custom HeaderFilterStrategy to filter header to and from [...] diff --git a/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json b/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json index 31ccaf7f277..f425fe13671 100644 --- a/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json +++ b/components/camel-kafka/src/generated/resources/META-INF/org/apache/camel/component/kafka/kafka.json @@ -24,7 +24,7 @@ "remote": true }, "componentProperties": { - "additionalProperties": { "index": 0, "kind": "property", "displayName": "Additional Properties", "group": "common", "label": "common", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "prefix": "additionalProperties.", "multiValue": true, "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets [...] + "additionalProperties": { "index": 0, "kind": "property", "displayName": "Additional Properties", "group": "common", "label": "common", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "prefix": "additionalProperties.", "multiValue": true, "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets [...] "brokers": { "index": 1, "kind": "property", "displayName": "Brokers", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "URL of the Kafka brokers to use. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP [...] "clientId": { "index": 2, "kind": "property", "displayName": "Client Id", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The client id is a user-specified string sent in each request to help trace calls. It should logically identify the a [...] "configuration": { "index": 3, "kind": "property", "displayName": "Configuration", "group": "common", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.kafka.KafkaConfiguration", "deprecated": false, "autowired": false, "secret": false, "description": "Allows to pre-configure the Kafka component with common options that the endpoints will reuse." }, @@ -158,7 +158,7 @@ }, "properties": { "topic": { "index": 0, "kind": "path", "displayName": "Topic", "group": "common", "label": "common", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Name of the topic to use. On the consumer you can use comma to separate multiple topics. A producer can on [...] - "additionalProperties": { "index": 1, "kind": "parameter", "displayName": "Additional Properties", "group": "common", "label": "common", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "prefix": "additionalProperties.", "multiValue": true, "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets [...] + "additionalProperties": { "index": 1, "kind": "parameter", "displayName": "Additional Properties", "group": "common", "label": "common", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "prefix": "additionalProperties.", "multiValue": true, "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "Sets [...] "brokers": { "index": 2, "kind": "parameter", "displayName": "Brokers", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "URL of the Kafka brokers to use. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VI [...] "clientId": { "index": 3, "kind": "parameter", "displayName": "Client Id", "group": "common", "label": "common", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "The client id is a user-specified string sent in each request to help trace calls. It should logically identify the [...] "headerFilterStrategy": { "index": 4, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "common", "label": "common", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.kafka.KafkaConfiguration", "configurationField": "configuration", "description": "To use a custom HeaderFilterStrategy to filter header to and from [...] diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index 1f5327d48d0..4c738483ca8 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -92,7 +92,7 @@ on the component level, which allows controlling which of the strategies to use By default, Camel will poll using the *ERROR_HANDLER* to process exceptions. How Camel handles a message that results in an exception can be altered using the `breakOnFirstError` attribute in the configuration. Instead of continuing to poll the next message, Camel will instead commit the offset so that the message that caused the exception will be retried. -This is similar to the *RETRY* polling strategy above. +This is similar to the *RETRY* polling strategy above. [source,java] ---- @@ -260,7 +260,7 @@ Then the commit will be done in the next consumer loop using the kafka asynchron If you want to use a custom implementation of `KafkaManualCommit` then you can configure a custom `KafkaManualCommitFactory` on the `KafkaComponent` that creates instances of your custom implementation. -When configuring a consumer to use manual commit and a specific `CommitManager` it is important to understand how these influence the behavior +When configuring a consumer to use manual commit and a specific `CommitManager` it is important to understand how these influence the behavior of `breakOnFirstError` [source,java] @@ -274,7 +274,7 @@ kafka.setKafkaManualCommitFactory(new DefaultKafkaManualCommitFactory()); camelContext.addComponent("kafka", kafka); ---- -When the `CommitManager` is left to the default `NoopCommitManager` then `breakOnFirstError` will not automatically commit the offset so that the +When the `CommitManager` is left to the default `NoopCommitManager` then `breakOnFirstError` will not automatically commit the offset so that the message with an error is retried. The consumer must manage that in the route using `KafkaManualCommit`. @@ -362,6 +362,58 @@ static { } ---- +=== Authentication to Kafka + +Kafka supports several ways to authenticate the clients to the server, including plain text, PKI (certificates) over TLS, you can refer to the https://kafka.apache.org/documentation/#security_sasl[Kafka documentation] for a detailed view of the supported mechanisms. The kafka authentication and authorization is based on JAAS, so you must use a JAAS Login Module implementation on the client side. + +This section will outline the main points for the authentication using https://github.com/strimzi/strimzi-kafka-oauth/blob/main/README.md[Strimzi JAAS Login Module] and plain text. The Strimzi OAuth contains several properties to fine tune, the authentication mechanism, so you can set them into the `OAuthBearerLoginModule` section. + +The most basic way to authenticate is using the plain text login module with username and password. Beware that this is unsafe and and we suggest the OAuth over TLS for a fully secure mechanism. + +*Username and Password over TLS* +[source] +---- +camel.component.kafka.security-protocol = SASL_SSL +camel.component.kafka.sasl-mechanism=PLAIN +camel.component.kafka.sasl-jaas-config=org.apache.kafka.common.security.plain.PlainLoginModule required \ + username="my_username" \ + password="my_password"; +---- + +There is the Strimzi OAuth Login Module that supports the more secure OAuth mechanisms, where you can set a refresh token, username/password and client secret. You must understand the Kafka Broker security settings to adequately configure the client security configuration. + +*OAuth Bearer Token with client secret* +[source] +---- +camel.component.kafka.security-protocol = SASL_PLAINTEXT +camel.component.kafka.sasl-mechanism = OAUTHBEARER +camel.component.kafka.sasl-jaas-config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ + oauth.client.id="kafka-producer-client" \ + oauth.client.secret="kafka-producer-client-secret" \ + oauth.username.claim="preferred_username" \ + oauth.ssl.truststore.location="docker/certificates/ca-truststore.p12" \ + oauth.ssl.truststore.type="pkcs12" \ + oauth.ssl.truststore.password="changeit" \ + oauth.token.endpoint.uri="https://keycloak:8443/realms/demo/protocol/openid-connect/token" ; +camel.component.kafka.additional-properties[sasl.login.callback.handler.class]=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler +---- + +*OAuth Bearer Token with refresh token* +[source] +---- +camel.component.kafka.security-protocol = SASL_PLAINTEXT +camel.component.kafka.sasl-mechanism = OAUTHBEARER +camel.component.kafka.sasl-jaas-config = org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \ + oauth.client.id="kafka-producer-client" \ + oauth.refresh.token="my_refresh_token" + oauth.username.claim="preferred_username" \ + oauth.ssl.truststore.location="docker/certificates/ca-truststore.p12" \ + oauth.ssl.truststore.type="pkcs12" \ + oauth.ssl.truststore.password="changeit" \ + oauth.token.endpoint.uri="https://keycloak:8443/realms/demo/protocol/openid-connect/token" ; +camel.component.kafka.additional-properties[sasl.login.callback.handler.class]=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler +---- + === Batching Consumer To use a Kafka batching consumer with Camel, an application has to set the configuration `batching` to `true`. diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 5377ada9345..ad494a210a2 100755 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -1909,7 +1909,10 @@ public class KafkaConfiguration implements Cloneable, HeaderFilterStrategyAware * Sets additional properties for either kafka consumer or kafka producer in case they can't be set directly on the * camel configurations (e.g.: new Kafka properties that are not reflected yet in Camel configurations), the * properties have to be prefixed with `additionalProperties.`., e.g.: - * `additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro` + * `additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro`. + * If the properties are set in the `application.properties` file, they must be prefixed with + * `camel.component.kafka.additional-properties` and the property enclosed in square brackets, like this example: + * `camel.component.kafka.additional-properties[delivery.timeout.ms]=15000`. */ public void setAdditionalProperties(Map<String, Object> additionalProperties) { this.additionalProperties = additionalProperties; diff --git a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java index fe0af04e9ec..2add3a3aa11 100644 --- a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java +++ b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/KafkaComponentBuilderFactory.java @@ -56,7 +56,7 @@ public interface KafkaComponentBuilderFactory { * configurations (e.g.: new Kafka properties that are not reflected yet * in Camel configurations), the properties have to be prefixed with * additionalProperties.., e.g.: - * additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro. + * additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro. If the properties are set in the application.properties file, they must be prefixed with camel.component.kafka.additional-properties and the property enclosed in square brackets, like this example: camel.component.kafka.additional-propertiesdelivery.timeout.ms=15000. * * The option is a: <code>java.util.Map&lt;java.lang.String, * java.lang.Object&gt;</code> type. diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java index dbea4bb68b0..5b27a594f08 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/KafkaEndpointBuilderFactory.java @@ -49,7 +49,7 @@ public interface KafkaEndpointBuilderFactory { * configurations (e.g.: new Kafka properties that are not reflected yet * in Camel configurations), the properties have to be prefixed with * additionalProperties.., e.g.: - * additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro. + * additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro. If the properties are set in the application.properties file, they must be prefixed with camel.component.kafka.additional-properties and the property enclosed in square brackets, like this example: camel.component.kafka.additional-propertiesdelivery.timeout.ms=15000. * * The option is a: <code>java.util.Map<java.lang.String, * java.lang.Object></code> type. @@ -73,7 +73,7 @@ public interface KafkaEndpointBuilderFactory { * configurations (e.g.: new Kafka properties that are not reflected yet * in Camel configurations), the properties have to be prefixed with * additionalProperties.., e.g.: - * additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro. + * additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro. If the properties are set in the application.properties file, they must be prefixed with camel.component.kafka.additional-properties and the property enclosed in square brackets, like this example: camel.component.kafka.additional-propertiesdelivery.timeout.ms=15000. * * The option is a: <code>java.util.Map<java.lang.String, * java.lang.Object></code> type. @@ -2148,7 +2148,7 @@ public interface KafkaEndpointBuilderFactory { * configurations (e.g.: new Kafka properties that are not reflected yet * in Camel configurations), the properties have to be prefixed with * additionalProperties.., e.g.: - * additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro. + * additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro. If the properties are set in the application.properties file, they must be prefixed with camel.component.kafka.additional-properties and the property enclosed in square brackets, like this example: camel.component.kafka.additional-propertiesdelivery.timeout.ms=15000. * * The option is a: <code>java.util.Map<java.lang.String, * java.lang.Object></code> type. @@ -2172,7 +2172,7 @@ public interface KafkaEndpointBuilderFactory { * configurations (e.g.: new Kafka properties that are not reflected yet * in Camel configurations), the properties have to be prefixed with * additionalProperties.., e.g.: - * additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro. + * additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro. If the properties are set in the application.properties file, they must be prefixed with camel.component.kafka.additional-properties and the property enclosed in square brackets, like this example: camel.component.kafka.additional-propertiesdelivery.timeout.ms=15000. * * The option is a: <code>java.util.Map<java.lang.String, * java.lang.Object></code> type. @@ -4267,7 +4267,7 @@ public interface KafkaEndpointBuilderFactory { * configurations (e.g.: new Kafka properties that are not reflected yet * in Camel configurations), the properties have to be prefixed with * additionalProperties.., e.g.: - * additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro. + * additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro. If the properties are set in the application.properties file, they must be prefixed with camel.component.kafka.additional-properties and the property enclosed in square brackets, like this example: camel.component.kafka.additional-propertiesdelivery.timeout.ms=15000. * * The option is a: <code>java.util.Map<java.lang.String, * java.lang.Object></code> type. @@ -4291,7 +4291,7 @@ public interface KafkaEndpointBuilderFactory { * configurations (e.g.: new Kafka properties that are not reflected yet * in Camel configurations), the properties have to be prefixed with * additionalProperties.., e.g.: - * additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro. + * additionalProperties.transactional.id=12345&additionalProperties.schema.registry.url=http://localhost:8811/avro. If the properties are set in the application.properties file, they must be prefixed with camel.component.kafka.additional-properties and the property enclosed in square brackets, like this example: camel.component.kafka.additional-propertiesdelivery.timeout.ms=15000. * * The option is a: <code>java.util.Map<java.lang.String, * java.lang.Object></code> type.