This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch feat/camel-22760-kafka-spring-boot-properties in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git
commit afa9b4e105b4d21f3f37a7aeb0bac2b7f5a82e51 Author: Guillaume Nodet <[email protected]> AuthorDate: Thu Mar 12 15:17:59 2026 +0100 CAMEL-22760: Bridge spring.kafka.* properties to camel-kafka component When using camel-kafka-starter with Spring Boot, users previously had to duplicate their Kafka configuration under both spring.kafka.* and camel.component.kafka.* properties. This adds a SpringKafkaPropertiesAutoConfiguration that automatically bridges Spring Boot's KafkaProperties to the Camel Kafka component configuration, including: - bootstrap-servers -> brokers - security.protocol -> security-protocol - ssl.* properties (keystore, truststore, types, passwords) - consumer.group-id -> group-id - client-id - SASL properties from spring.kafka.properties map Explicit camel.component.kafka.* settings always take precedence. Co-Authored-By: Claude Opus 4.6 <[email protected]> --- components-starter/camel-kafka-starter/pom.xml | 6 + .../SpringKafkaPropertiesAutoConfiguration.java | 210 +++++++++++++++++++++ ...rk.boot.autoconfigure.AutoConfiguration.imports | 1 + .../SpringKafkaPropertiesBridgeTest.java | 175 +++++++++++++++++ 4 files changed, 392 insertions(+) diff --git a/components-starter/camel-kafka-starter/pom.xml b/components-starter/camel-kafka-starter/pom.xml index 07a6cfb1ace..b4e69d95841 100644 --- a/components-starter/camel-kafka-starter/pom.xml +++ b/components-starter/camel-kafka-starter/pom.xml @@ -52,6 +52,12 @@ </exclusions> --> </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-kafka</artifactId> + <version>${spring-boot-version}</version> + <optional>true</optional> + </dependency> <!-- test --> <dependency> <groupId>org.apache.camel</groupId> diff --git a/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/SpringKafkaPropertiesAutoConfiguration.java b/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/SpringKafkaPropertiesAutoConfiguration.java new file mode 100644 index 00000000000..75cea2a6793 --- /dev/null +++ b/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/SpringKafkaPropertiesAutoConfiguration.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.springboot; + +import java.util.Collections; +import java.util.Map; + +import jakarta.annotation.PostConstruct; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.autoconfigure.AutoConfigureBefore; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.context.properties.bind.Bindable; +import org.springframework.boot.context.properties.bind.Binder; +import org.springframework.boot.kafka.autoconfigure.KafkaProperties; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.env.Environment; +import org.springframework.core.io.Resource; + +/** + * Auto-configuration that bridges Spring Boot's {@code spring.kafka.*} properties + * to the Camel Kafka component configuration ({@code camel.component.kafka.*}). + * <p> + * This allows users to configure Kafka once using standard Spring Boot properties + * and have the Camel Kafka component automatically pick up those settings, without + * needing to duplicate configuration under {@code camel.component.kafka.*}. + * <p> + * If a property is explicitly set under {@code camel.component.kafka.*}, it takes + * precedence over the corresponding {@code spring.kafka.*} property. + */ +@Configuration(proxyBeanMethods = false) +@ConditionalOnClass(KafkaProperties.class) +@AutoConfigureBefore(KafkaComponentAutoConfiguration.class) +public class SpringKafkaPropertiesAutoConfiguration { + + private static final Logger LOG = LoggerFactory.getLogger(SpringKafkaPropertiesAutoConfiguration.class); + + private final KafkaProperties kafkaProperties; + private final KafkaComponentConfiguration camelKafkaConfig; + private final Environment environment; + + public SpringKafkaPropertiesAutoConfiguration( + KafkaProperties kafkaProperties, + KafkaComponentConfiguration camelKafkaConfig, + Environment environment) { + this.kafkaProperties = kafkaProperties; + this.camelKafkaConfig = camelKafkaConfig; + this.environment = environment; + } + + @PostConstruct + public void bridgeProperties() { + // Get the set of camel.component.kafka.* properties explicitly set by the user + Map<String, Object> camelKafkaProps = Binder.get(environment) + .bind("camel.component.kafka", Bindable.mapOf(String.class, Object.class)) + .orElse(Collections.emptyMap()); + + boolean bridged = false; + + // Bootstrap servers + if (!camelKafkaProps.containsKey("brokers") + && kafkaProperties.getBootstrapServers() != null + && !kafkaProperties.getBootstrapServers().isEmpty()) { + String brokers = String.join(",", kafkaProperties.getBootstrapServers()); + camelKafkaConfig.setBrokers(brokers); + LOG.debug("Bridged spring.kafka.bootstrap-servers -> camel.component.kafka.brokers: {}", brokers); + bridged = true; + } + + // Client ID + if (!camelKafkaProps.containsKey("client-id") + && kafkaProperties.getClientId() != null) { + camelKafkaConfig.setClientId(kafkaProperties.getClientId()); + LOG.debug("Bridged spring.kafka.client-id -> camel.component.kafka.client-id"); + bridged = true; + } + + // Security protocol + if (!camelKafkaProps.containsKey("security-protocol") + && kafkaProperties.getSecurity() != null + && kafkaProperties.getSecurity().getProtocol() != null) { + camelKafkaConfig.setSecurityProtocol(kafkaProperties.getSecurity().getProtocol()); + LOG.debug("Bridged spring.kafka.security.protocol -> camel.component.kafka.security-protocol"); + bridged = true; + } + + // Consumer group ID + if (!camelKafkaProps.containsKey("group-id") + && kafkaProperties.getConsumer() != null + && kafkaProperties.getConsumer().getGroupId() != null) { + camelKafkaConfig.setGroupId(kafkaProperties.getConsumer().getGroupId()); + LOG.debug("Bridged spring.kafka.consumer.group-id -> camel.component.kafka.group-id"); + bridged = true; + } + + // SSL properties + bridged |= bridgeSslProperties(camelKafkaProps); + + // SASL properties from spring.kafka.properties map + bridged |= bridgeSaslProperties(camelKafkaProps); + + if (bridged) { + LOG.info("Bridged spring.kafka.* properties to camel.component.kafka.*"); + } + } + + private boolean bridgeSslProperties(Map<String, Object> camelKafkaProps) { + KafkaProperties.Ssl ssl = kafkaProperties.getSsl(); + if (ssl == null) { + return false; + } + + boolean bridged = false; + + if (!camelKafkaProps.containsKey("ssl-key-password") && ssl.getKeyPassword() != null) { + camelKafkaConfig.setSslKeyPassword(ssl.getKeyPassword()); + bridged = true; + } + if (!camelKafkaProps.containsKey("ssl-keystore-location") && ssl.getKeyStoreLocation() != null) { + camelKafkaConfig.setSslKeystoreLocation(resourceToPath(ssl.getKeyStoreLocation())); + bridged = true; + } + if (!camelKafkaProps.containsKey("ssl-keystore-password") && ssl.getKeyStorePassword() != null) { + camelKafkaConfig.setSslKeystorePassword(ssl.getKeyStorePassword()); + bridged = true; + } + if (!camelKafkaProps.containsKey("ssl-keystore-type") && ssl.getKeyStoreType() != null) { + camelKafkaConfig.setSslKeystoreType(ssl.getKeyStoreType()); + bridged = true; + } + if (!camelKafkaProps.containsKey("ssl-truststore-location") && ssl.getTrustStoreLocation() != null) { + camelKafkaConfig.setSslTruststoreLocation(resourceToPath(ssl.getTrustStoreLocation())); + bridged = true; + } + if (!camelKafkaProps.containsKey("ssl-truststore-password") && ssl.getTrustStorePassword() != null) { + camelKafkaConfig.setSslTruststorePassword(ssl.getTrustStorePassword()); + bridged = true; + } + if (!camelKafkaProps.containsKey("ssl-truststore-type") && ssl.getTrustStoreType() != null) { + camelKafkaConfig.setSslTruststoreType(ssl.getTrustStoreType()); + bridged = true; + } + if (!camelKafkaProps.containsKey("ssl-protocol") && ssl.getProtocol() != null) { + camelKafkaConfig.setSslProtocol(ssl.getProtocol()); + bridged = true; + } + + if (bridged) { + LOG.debug("Bridged spring.kafka.ssl.* properties to camel.component.kafka.ssl-*"); + } + return bridged; + } + + private boolean bridgeSaslProperties(Map<String, Object> camelKafkaProps) { + Map<String, String> rawProps = kafkaProperties.getProperties(); + if (rawProps == null || rawProps.isEmpty()) { + return false; + } + + boolean bridged = false; + + if (!camelKafkaProps.containsKey("sasl-mechanism") + && rawProps.containsKey("sasl.mechanism")) { + camelKafkaConfig.setSaslMechanism(rawProps.get("sasl.mechanism")); + LOG.debug("Bridged spring.kafka.properties[sasl.mechanism] -> camel.component.kafka.sasl-mechanism"); + bridged = true; + } + if (!camelKafkaProps.containsKey("sasl-jaas-config") + && rawProps.containsKey("sasl.jaas.config")) { + camelKafkaConfig.setSaslJaasConfig(rawProps.get("sasl.jaas.config")); + LOG.debug("Bridged spring.kafka.properties[sasl.jaas.config] -> camel.component.kafka.sasl-jaas-config"); + bridged = true; + } + if (!camelKafkaProps.containsKey("sasl-kerberos-service-name") + && rawProps.containsKey("sasl.kerberos.service.name")) { + camelKafkaConfig.setSaslKerberosServiceName(rawProps.get("sasl.kerberos.service.name")); + LOG.debug("Bridged spring.kafka.properties[sasl.kerberos.service.name] -> camel.component.kafka.sasl-kerberos-service-name"); + bridged = true; + } + + return bridged; + } + + private static String resourceToPath(Resource resource) { + try { + return resource.getFile().getAbsolutePath(); + } catch (Exception e) { + try { + return resource.getURI().toString(); + } catch (Exception ex) { + return resource.toString(); + } + } + } +} diff --git a/components-starter/camel-kafka-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/components-starter/camel-kafka-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 829a9783d16..80534284c05 100644 --- a/components-starter/camel-kafka-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/components-starter/camel-kafka-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -17,3 +17,4 @@ org.apache.camel.component.kafka.springboot.KafkaComponentConverter org.apache.camel.component.kafka.springboot.KafkaComponentAutoConfiguration +org.apache.camel.component.kafka.springboot.SpringKafkaPropertiesAutoConfiguration diff --git a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/springboot/SpringKafkaPropertiesBridgeTest.java b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/springboot/SpringKafkaPropertiesBridgeTest.java new file mode 100644 index 00000000000..5b9bcf4aa0c --- /dev/null +++ b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/springboot/SpringKafkaPropertiesBridgeTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kafka.springboot; + +import org.apache.camel.CamelContext; +import org.apache.camel.component.kafka.KafkaComponent; +import org.apache.camel.spring.boot.CamelAutoConfiguration; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.kafka.autoconfigure.KafkaAutoConfiguration; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +class SpringKafkaPropertiesBridgeTest { + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of( + CamelAutoConfiguration.class, + KafkaAutoConfiguration.class, + KafkaComponentAutoConfiguration.class, + SpringKafkaPropertiesAutoConfiguration.class)); + + @Test + void shouldBridgeBootstrapServers() { + contextRunner + .withPropertyValues( + "spring.kafka.bootstrap-servers=broker1:9092,broker2:9092", + "camel.component.kafka.enabled=true") + .run(context -> { + KafkaComponent kafka = context.getBean(CamelContext.class) + .getComponent("kafka", KafkaComponent.class); + assertThat(kafka.getConfiguration().getBrokers()) + .isEqualTo("broker1:9092,broker2:9092"); + }); + } + + @Test + void shouldPreferExplicitCamelProperty() { + contextRunner + .withPropertyValues( + "spring.kafka.bootstrap-servers=broker1:9092", + "camel.component.kafka.brokers=my-broker:9092", + "camel.component.kafka.enabled=true") + .run(context -> { + KafkaComponent kafka = context.getBean(CamelContext.class) + .getComponent("kafka", KafkaComponent.class); + assertThat(kafka.getConfiguration().getBrokers()) + .isEqualTo("my-broker:9092"); + }); + } + + @Test + void shouldBridgeSecurityProtocol() { + contextRunner + .withPropertyValues( + "spring.kafka.security.protocol=SASL_SSL", + "camel.component.kafka.enabled=true") + .run(context -> { + KafkaComponent kafka = context.getBean(CamelContext.class) + .getComponent("kafka", KafkaComponent.class); + assertThat(kafka.getConfiguration().getSecurityProtocol()) + .isEqualTo("SASL_SSL"); + }); + } + + @Test + void shouldBridgeSslProperties() { + contextRunner + .withPropertyValues( + "spring.kafka.ssl.key-store-password=keypass", + "spring.kafka.ssl.key-store-type=PKCS12", + "spring.kafka.ssl.trust-store-password=trustpass", + "spring.kafka.ssl.trust-store-type=PEM", + "camel.component.kafka.enabled=true") + .run(context -> { + KafkaComponent kafka = context.getBean(CamelContext.class) + .getComponent("kafka", KafkaComponent.class); + assertThat(kafka.getConfiguration().getSslKeystorePassword()) + .isEqualTo("keypass"); + assertThat(kafka.getConfiguration().getSslKeystoreType()) + .isEqualTo("PKCS12"); + assertThat(kafka.getConfiguration().getSslTruststorePassword()) + .isEqualTo("trustpass"); + assertThat(kafka.getConfiguration().getSslTruststoreType()) + .isEqualTo("PEM"); + }); + } + + @Test + void shouldBridgeConsumerGroupId() { + contextRunner + .withPropertyValues( + "spring.kafka.consumer.group-id=my-group", + "camel.component.kafka.enabled=true") + .run(context -> { + KafkaComponent kafka = context.getBean(CamelContext.class) + .getComponent("kafka", KafkaComponent.class); + assertThat(kafka.getConfiguration().getGroupId()) + .isEqualTo("my-group"); + }); + } + + @Test + void shouldBridgeSaslMechanismFromProperties() { + contextRunner + .withPropertyValues( + "spring.kafka.properties.sasl.mechanism=PLAIN", + "camel.component.kafka.enabled=true") + .run(context -> { + KafkaComponent kafka = context.getBean(CamelContext.class) + .getComponent("kafka", KafkaComponent.class); + assertThat(kafka.getConfiguration().getSaslMechanism()) + .isEqualTo("PLAIN"); + }); + } + + @Test + void shouldBridgeSaslJaasConfigFromProperties() { + String jaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"pass\";"; + contextRunner + .withPropertyValues( + "spring.kafka.properties.sasl.jaas.config=" + jaasConfig, + "camel.component.kafka.enabled=true") + .run(context -> { + KafkaComponent kafka = context.getBean(CamelContext.class) + .getComponent("kafka", KafkaComponent.class); + assertThat(kafka.getConfiguration().getSaslJaasConfig()) + .isEqualTo(jaasConfig); + }); + } + + @Test + void shouldNotBridgeWhenSpringKafkaNotConfigured() { + contextRunner + .withPropertyValues( + "camel.component.kafka.enabled=true") + .run(context -> { + KafkaComponent kafka = context.getBean(CamelContext.class) + .getComponent("kafka", KafkaComponent.class); + // Default value should remain + assertThat(kafka.getConfiguration().getSecurityProtocol()) + .isEqualTo("PLAINTEXT"); + }); + } + + @Test + void shouldBridgeClientId() { + contextRunner + .withPropertyValues( + "spring.kafka.client-id=my-client", + "camel.component.kafka.enabled=true") + .run(context -> { + KafkaComponent kafka = context.getBean(CamelContext.class) + .getComponent("kafka", KafkaComponent.class); + assertThat(kafka.getConfiguration().getClientId()) + .isEqualTo("my-client"); + }); + } +}
