This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch feat/camel-22760-kafka-spring-boot-properties
in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git

commit afa9b4e105b4d21f3f37a7aeb0bac2b7f5a82e51
Author: Guillaume Nodet <[email protected]>
AuthorDate: Thu Mar 12 15:17:59 2026 +0100

    CAMEL-22760: Bridge spring.kafka.* properties to camel-kafka component
    
    When using camel-kafka-starter with Spring Boot, users previously had
    to duplicate their Kafka configuration under both spring.kafka.* and
    camel.component.kafka.* properties.
    
    This adds a SpringKafkaPropertiesAutoConfiguration that automatically
    bridges Spring Boot's KafkaProperties to the Camel Kafka component
    configuration, including:
    - bootstrap-servers -> brokers
    - security.protocol -> security-protocol
    - ssl.* properties (keystore, truststore, types, passwords)
    - consumer.group-id -> group-id
    - client-id
    - SASL properties from spring.kafka.properties map
    
    Explicit camel.component.kafka.* settings always take precedence.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
 components-starter/camel-kafka-starter/pom.xml     |   6 +
 .../SpringKafkaPropertiesAutoConfiguration.java    | 210 +++++++++++++++++++++
 ...rk.boot.autoconfigure.AutoConfiguration.imports |   1 +
 .../SpringKafkaPropertiesBridgeTest.java           | 175 +++++++++++++++++
 4 files changed, 392 insertions(+)

diff --git a/components-starter/camel-kafka-starter/pom.xml 
b/components-starter/camel-kafka-starter/pom.xml
index 07a6cfb1ace..b4e69d95841 100644
--- a/components-starter/camel-kafka-starter/pom.xml
+++ b/components-starter/camel-kafka-starter/pom.xml
@@ -52,6 +52,12 @@
       </exclusions>
       -->
     </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-kafka</artifactId>
+      <version>${spring-boot-version}</version>
+      <optional>true</optional>
+    </dependency>
     <!-- test -->
     <dependency>
       <groupId>org.apache.camel</groupId>
diff --git 
a/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/SpringKafkaPropertiesAutoConfiguration.java
 
b/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/SpringKafkaPropertiesAutoConfiguration.java
new file mode 100644
index 00000000000..75cea2a6793
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/main/java/org/apache/camel/component/kafka/springboot/SpringKafkaPropertiesAutoConfiguration.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.springboot;
+
+import java.util.Collections;
+import java.util.Map;
+
+import jakarta.annotation.PostConstruct;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.autoconfigure.AutoConfigureBefore;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.context.properties.bind.Bindable;
+import org.springframework.boot.context.properties.bind.Binder;
+import org.springframework.boot.kafka.autoconfigure.KafkaProperties;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.env.Environment;
+import org.springframework.core.io.Resource;
+
+/**
+ * Auto-configuration that bridges Spring Boot's {@code spring.kafka.*} 
properties
+ * to the Camel Kafka component configuration ({@code 
camel.component.kafka.*}).
+ * <p>
+ * This allows users to configure Kafka once using standard Spring Boot 
properties
+ * and have the Camel Kafka component automatically pick up those settings, 
without
+ * needing to duplicate configuration under {@code camel.component.kafka.*}.
+ * <p>
+ * If a property is explicitly set under {@code camel.component.kafka.*}, it 
takes
+ * precedence over the corresponding {@code spring.kafka.*} property.
+ */
+@Configuration(proxyBeanMethods = false)
+@ConditionalOnClass(KafkaProperties.class)
+@AutoConfigureBefore(KafkaComponentAutoConfiguration.class)
+public class SpringKafkaPropertiesAutoConfiguration {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SpringKafkaPropertiesAutoConfiguration.class);
+
+    private final KafkaProperties kafkaProperties;
+    private final KafkaComponentConfiguration camelKafkaConfig;
+    private final Environment environment;
+
+    public SpringKafkaPropertiesAutoConfiguration(
+            KafkaProperties kafkaProperties,
+            KafkaComponentConfiguration camelKafkaConfig,
+            Environment environment) {
+        this.kafkaProperties = kafkaProperties;
+        this.camelKafkaConfig = camelKafkaConfig;
+        this.environment = environment;
+    }
+
+    @PostConstruct
+    public void bridgeProperties() {
+        // Get the set of camel.component.kafka.* properties explicitly set by 
the user
+        Map<String, Object> camelKafkaProps = Binder.get(environment)
+                .bind("camel.component.kafka", Bindable.mapOf(String.class, 
Object.class))
+                .orElse(Collections.emptyMap());
+
+        boolean bridged = false;
+
+        // Bootstrap servers
+        if (!camelKafkaProps.containsKey("brokers")
+                && kafkaProperties.getBootstrapServers() != null
+                && !kafkaProperties.getBootstrapServers().isEmpty()) {
+            String brokers = String.join(",", 
kafkaProperties.getBootstrapServers());
+            camelKafkaConfig.setBrokers(brokers);
+            LOG.debug("Bridged spring.kafka.bootstrap-servers -> 
camel.component.kafka.brokers: {}", brokers);
+            bridged = true;
+        }
+
+        // Client ID
+        if (!camelKafkaProps.containsKey("client-id")
+                && kafkaProperties.getClientId() != null) {
+            camelKafkaConfig.setClientId(kafkaProperties.getClientId());
+            LOG.debug("Bridged spring.kafka.client-id -> 
camel.component.kafka.client-id");
+            bridged = true;
+        }
+
+        // Security protocol
+        if (!camelKafkaProps.containsKey("security-protocol")
+                && kafkaProperties.getSecurity() != null
+                && kafkaProperties.getSecurity().getProtocol() != null) {
+            
camelKafkaConfig.setSecurityProtocol(kafkaProperties.getSecurity().getProtocol());
+            LOG.debug("Bridged spring.kafka.security.protocol -> 
camel.component.kafka.security-protocol");
+            bridged = true;
+        }
+
+        // Consumer group ID
+        if (!camelKafkaProps.containsKey("group-id")
+                && kafkaProperties.getConsumer() != null
+                && kafkaProperties.getConsumer().getGroupId() != null) {
+            
camelKafkaConfig.setGroupId(kafkaProperties.getConsumer().getGroupId());
+            LOG.debug("Bridged spring.kafka.consumer.group-id -> 
camel.component.kafka.group-id");
+            bridged = true;
+        }
+
+        // SSL properties
+        bridged |= bridgeSslProperties(camelKafkaProps);
+
+        // SASL properties from spring.kafka.properties map
+        bridged |= bridgeSaslProperties(camelKafkaProps);
+
+        if (bridged) {
+            LOG.info("Bridged spring.kafka.* properties to 
camel.component.kafka.*");
+        }
+    }
+
+    private boolean bridgeSslProperties(Map<String, Object> camelKafkaProps) {
+        KafkaProperties.Ssl ssl = kafkaProperties.getSsl();
+        if (ssl == null) {
+            return false;
+        }
+
+        boolean bridged = false;
+
+        if (!camelKafkaProps.containsKey("ssl-key-password") && 
ssl.getKeyPassword() != null) {
+            camelKafkaConfig.setSslKeyPassword(ssl.getKeyPassword());
+            bridged = true;
+        }
+        if (!camelKafkaProps.containsKey("ssl-keystore-location") && 
ssl.getKeyStoreLocation() != null) {
+            
camelKafkaConfig.setSslKeystoreLocation(resourceToPath(ssl.getKeyStoreLocation()));
+            bridged = true;
+        }
+        if (!camelKafkaProps.containsKey("ssl-keystore-password") && 
ssl.getKeyStorePassword() != null) {
+            camelKafkaConfig.setSslKeystorePassword(ssl.getKeyStorePassword());
+            bridged = true;
+        }
+        if (!camelKafkaProps.containsKey("ssl-keystore-type") && 
ssl.getKeyStoreType() != null) {
+            camelKafkaConfig.setSslKeystoreType(ssl.getKeyStoreType());
+            bridged = true;
+        }
+        if (!camelKafkaProps.containsKey("ssl-truststore-location") && 
ssl.getTrustStoreLocation() != null) {
+            
camelKafkaConfig.setSslTruststoreLocation(resourceToPath(ssl.getTrustStoreLocation()));
+            bridged = true;
+        }
+        if (!camelKafkaProps.containsKey("ssl-truststore-password") && 
ssl.getTrustStorePassword() != null) {
+            
camelKafkaConfig.setSslTruststorePassword(ssl.getTrustStorePassword());
+            bridged = true;
+        }
+        if (!camelKafkaProps.containsKey("ssl-truststore-type") && 
ssl.getTrustStoreType() != null) {
+            camelKafkaConfig.setSslTruststoreType(ssl.getTrustStoreType());
+            bridged = true;
+        }
+        if (!camelKafkaProps.containsKey("ssl-protocol") && ssl.getProtocol() 
!= null) {
+            camelKafkaConfig.setSslProtocol(ssl.getProtocol());
+            bridged = true;
+        }
+
+        if (bridged) {
+            LOG.debug("Bridged spring.kafka.ssl.* properties to 
camel.component.kafka.ssl-*");
+        }
+        return bridged;
+    }
+
+    private boolean bridgeSaslProperties(Map<String, Object> camelKafkaProps) {
+        Map<String, String> rawProps = kafkaProperties.getProperties();
+        if (rawProps == null || rawProps.isEmpty()) {
+            return false;
+        }
+
+        boolean bridged = false;
+
+        if (!camelKafkaProps.containsKey("sasl-mechanism")
+                && rawProps.containsKey("sasl.mechanism")) {
+            camelKafkaConfig.setSaslMechanism(rawProps.get("sasl.mechanism"));
+            LOG.debug("Bridged spring.kafka.properties[sasl.mechanism] -> 
camel.component.kafka.sasl-mechanism");
+            bridged = true;
+        }
+        if (!camelKafkaProps.containsKey("sasl-jaas-config")
+                && rawProps.containsKey("sasl.jaas.config")) {
+            
camelKafkaConfig.setSaslJaasConfig(rawProps.get("sasl.jaas.config"));
+            LOG.debug("Bridged spring.kafka.properties[sasl.jaas.config] -> 
camel.component.kafka.sasl-jaas-config");
+            bridged = true;
+        }
+        if (!camelKafkaProps.containsKey("sasl-kerberos-service-name")
+                && rawProps.containsKey("sasl.kerberos.service.name")) {
+            
camelKafkaConfig.setSaslKerberosServiceName(rawProps.get("sasl.kerberos.service.name"));
+            LOG.debug("Bridged 
spring.kafka.properties[sasl.kerberos.service.name] -> 
camel.component.kafka.sasl-kerberos-service-name");
+            bridged = true;
+        }
+
+        return bridged;
+    }
+
+    private static String resourceToPath(Resource resource) {
+        try {
+            return resource.getFile().getAbsolutePath();
+        } catch (Exception e) {
+            try {
+                return resource.getURI().toString();
+            } catch (Exception ex) {
+                return resource.toString();
+            }
+        }
+    }
+}
diff --git 
a/components-starter/camel-kafka-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
 
b/components-starter/camel-kafka-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
index 829a9783d16..80534284c05 100644
--- 
a/components-starter/camel-kafka-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
+++ 
b/components-starter/camel-kafka-starter/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -17,3 +17,4 @@
 
 org.apache.camel.component.kafka.springboot.KafkaComponentConverter
 org.apache.camel.component.kafka.springboot.KafkaComponentAutoConfiguration
+org.apache.camel.component.kafka.springboot.SpringKafkaPropertiesAutoConfiguration
diff --git 
a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/springboot/SpringKafkaPropertiesBridgeTest.java
 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/springboot/SpringKafkaPropertiesBridgeTest.java
new file mode 100644
index 00000000000..5b9bcf4aa0c
--- /dev/null
+++ 
b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/springboot/SpringKafkaPropertiesBridgeTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.springboot;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.kafka.KafkaComponent;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.AutoConfigurations;
+import org.springframework.boot.kafka.autoconfigure.KafkaAutoConfiguration;
+import org.springframework.boot.test.context.runner.ApplicationContextRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class SpringKafkaPropertiesBridgeTest {
+
+    private final ApplicationContextRunner contextRunner = new 
ApplicationContextRunner()
+            .withConfiguration(AutoConfigurations.of(
+                    CamelAutoConfiguration.class,
+                    KafkaAutoConfiguration.class,
+                    KafkaComponentAutoConfiguration.class,
+                    SpringKafkaPropertiesAutoConfiguration.class));
+
+    @Test
+    void shouldBridgeBootstrapServers() {
+        contextRunner
+                .withPropertyValues(
+                        
"spring.kafka.bootstrap-servers=broker1:9092,broker2:9092",
+                        "camel.component.kafka.enabled=true")
+                .run(context -> {
+                    KafkaComponent kafka = context.getBean(CamelContext.class)
+                            .getComponent("kafka", KafkaComponent.class);
+                    assertThat(kafka.getConfiguration().getBrokers())
+                            .isEqualTo("broker1:9092,broker2:9092");
+                });
+    }
+
+    @Test
+    void shouldPreferExplicitCamelProperty() {
+        contextRunner
+                .withPropertyValues(
+                        "spring.kafka.bootstrap-servers=broker1:9092",
+                        "camel.component.kafka.brokers=my-broker:9092",
+                        "camel.component.kafka.enabled=true")
+                .run(context -> {
+                    KafkaComponent kafka = context.getBean(CamelContext.class)
+                            .getComponent("kafka", KafkaComponent.class);
+                    assertThat(kafka.getConfiguration().getBrokers())
+                            .isEqualTo("my-broker:9092");
+                });
+    }
+
+    @Test
+    void shouldBridgeSecurityProtocol() {
+        contextRunner
+                .withPropertyValues(
+                        "spring.kafka.security.protocol=SASL_SSL",
+                        "camel.component.kafka.enabled=true")
+                .run(context -> {
+                    KafkaComponent kafka = context.getBean(CamelContext.class)
+                            .getComponent("kafka", KafkaComponent.class);
+                    assertThat(kafka.getConfiguration().getSecurityProtocol())
+                            .isEqualTo("SASL_SSL");
+                });
+    }
+
+    @Test
+    void shouldBridgeSslProperties() {
+        contextRunner
+                .withPropertyValues(
+                        "spring.kafka.ssl.key-store-password=keypass",
+                        "spring.kafka.ssl.key-store-type=PKCS12",
+                        "spring.kafka.ssl.trust-store-password=trustpass",
+                        "spring.kafka.ssl.trust-store-type=PEM",
+                        "camel.component.kafka.enabled=true")
+                .run(context -> {
+                    KafkaComponent kafka = context.getBean(CamelContext.class)
+                            .getComponent("kafka", KafkaComponent.class);
+                    
assertThat(kafka.getConfiguration().getSslKeystorePassword())
+                            .isEqualTo("keypass");
+                    assertThat(kafka.getConfiguration().getSslKeystoreType())
+                            .isEqualTo("PKCS12");
+                    
assertThat(kafka.getConfiguration().getSslTruststorePassword())
+                            .isEqualTo("trustpass");
+                    assertThat(kafka.getConfiguration().getSslTruststoreType())
+                            .isEqualTo("PEM");
+                });
+    }
+
+    @Test
+    void shouldBridgeConsumerGroupId() {
+        contextRunner
+                .withPropertyValues(
+                        "spring.kafka.consumer.group-id=my-group",
+                        "camel.component.kafka.enabled=true")
+                .run(context -> {
+                    KafkaComponent kafka = context.getBean(CamelContext.class)
+                            .getComponent("kafka", KafkaComponent.class);
+                    assertThat(kafka.getConfiguration().getGroupId())
+                            .isEqualTo("my-group");
+                });
+    }
+
+    @Test
+    void shouldBridgeSaslMechanismFromProperties() {
+        contextRunner
+                .withPropertyValues(
+                        "spring.kafka.properties.sasl.mechanism=PLAIN",
+                        "camel.component.kafka.enabled=true")
+                .run(context -> {
+                    KafkaComponent kafka = context.getBean(CamelContext.class)
+                            .getComponent("kafka", KafkaComponent.class);
+                    assertThat(kafka.getConfiguration().getSaslMechanism())
+                            .isEqualTo("PLAIN");
+                });
+    }
+
+    @Test
+    void shouldBridgeSaslJaasConfigFromProperties() {
+        String jaasConfig = 
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"user\" password=\"pass\";";
+        contextRunner
+                .withPropertyValues(
+                        "spring.kafka.properties.sasl.jaas.config=" + 
jaasConfig,
+                        "camel.component.kafka.enabled=true")
+                .run(context -> {
+                    KafkaComponent kafka = context.getBean(CamelContext.class)
+                            .getComponent("kafka", KafkaComponent.class);
+                    assertThat(kafka.getConfiguration().getSaslJaasConfig())
+                            .isEqualTo(jaasConfig);
+                });
+    }
+
+    @Test
+    void shouldNotBridgeWhenSpringKafkaNotConfigured() {
+        contextRunner
+                .withPropertyValues(
+                        "camel.component.kafka.enabled=true")
+                .run(context -> {
+                    KafkaComponent kafka = context.getBean(CamelContext.class)
+                            .getComponent("kafka", KafkaComponent.class);
+                    // Default value should remain
+                    assertThat(kafka.getConfiguration().getSecurityProtocol())
+                            .isEqualTo("PLAINTEXT");
+                });
+    }
+
+    @Test
+    void shouldBridgeClientId() {
+        contextRunner
+                .withPropertyValues(
+                        "spring.kafka.client-id=my-client",
+                        "camel.component.kafka.enabled=true")
+                .run(context -> {
+                    KafkaComponent kafka = context.getBean(CamelContext.class)
+                            .getComponent("kafka", KafkaComponent.class);
+                    assertThat(kafka.getConfiguration().getClientId())
+                            .isEqualTo("my-client");
+                });
+    }
+}

Reply via email to