This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push: new 6e440e0 Add Quarkus service binding support to Kafka extension 6e440e0 is described below commit 6e440e02573a2ae136983454f345424e85cd0083 Author: James Netherton <jamesnether...@gmail.com> AuthorDate: Thu Apr 15 14:18:39 2021 +0100 Add Quarkus service binding support to Kafka extension Fixes #2333 --- .../component/kafka/deployment/KafkaProcessor.java | 20 +++ .../component/kafka/CamelKafkaRecorder.java | 49 ++++++++ .../component/kafka/QuarkusKafkaClientFactory.java | 69 +++++++++++ integration-tests/kafka-sasl/pom.xml | 134 +++++++++++++++++++++ .../quarkus/kafka/sasl/KafkaSaslResource.java | 71 +++++++++++ .../camel/quarkus/kafka/sasl/KafkaSaslRoutes.java | 19 +-- .../camel/quarkus/kafka/sasl/KafkaSupport.java | 78 ++++++++++++ .../src/main/resources/application.properties | 18 +++ .../quarkus/kafka/sasl/KafkaSaslBindingTest.java | 55 +++++++++ .../camel/quarkus/kafka/sasl/KafkaSaslIT.java | 14 +-- .../quarkus/kafka/sasl/KafkaSaslTestResource.java | 111 +++++++++++++++++ .../test/resources/config/kafka_server_jaas.conf | 7 ++ .../src/test/resources/k8s-sb/kafka/password | 1 + .../src/test/resources/k8s-sb/kafka/saslMechanism | 1 + .../test/resources/k8s-sb/kafka/securityProtocol | 1 + .../src/test/resources/k8s-sb/kafka/type | 1 + .../src/test/resources/k8s-sb/kafka/user | 1 + integration-tests/pom.xml | 1 + pom.xml | 1 + tooling/scripts/test-categories.yaml | 1 + 20 files changed, 634 insertions(+), 19 deletions(-) diff --git a/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java b/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java index a0a5359..270b340 100644 --- a/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java +++ b/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java @@ -16,8 +16,16 @@ */ package org.apache.camel.quarkus.component.kafka.deployment; +import java.util.List; + import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.annotations.ExecutionTime; +import io.quarkus.deployment.annotations.Record; import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.deployment.builditem.nativeimage.ServiceProviderBuildItem; +import org.apache.camel.component.kafka.KafkaComponent; +import org.apache.camel.quarkus.component.kafka.CamelKafkaRecorder; +import org.apache.camel.quarkus.core.deployment.spi.CamelRuntimeBeanBuildItem; class KafkaProcessor { private static final String FEATURE = "camel-kafka"; @@ -26,4 +34,16 @@ class KafkaProcessor { FeatureBuildItem feature() { return new FeatureBuildItem(FEATURE); } + + @BuildStep + @Record(ExecutionTime.RUNTIME_INIT) + CamelRuntimeBeanBuildItem createCamelKafkaComponent( + CamelKafkaRecorder recorder, + // We want Quarkus to configure the ServiceBindingConverter bits before this step + List<ServiceProviderBuildItem> serviceProviders) { + return new CamelRuntimeBeanBuildItem( + "kafka", + KafkaComponent.class.getName(), + recorder.createKafkaComponent()); + } } diff --git a/extensions/kafka/runtime/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRecorder.java b/extensions/kafka/runtime/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRecorder.java new file mode 100644 index 0000000..abc7bed --- /dev/null +++ b/extensions/kafka/runtime/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRecorder.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.kafka; + +import java.util.Collections; +import java.util.Map; + +import io.quarkus.arc.Arc; +import io.quarkus.arc.InstanceHandle; +import io.quarkus.runtime.RuntimeValue; +import io.quarkus.runtime.annotations.Recorder; +import org.apache.camel.component.kafka.KafkaComponent; + +@Recorder +public class CamelKafkaRecorder { + + @SuppressWarnings("unchecked") + public RuntimeValue<KafkaComponent> createKafkaComponent() { + final KafkaComponent component = new KafkaComponent(); + final InstanceHandle<Object> instance = Arc.container().instance("default-kafka-broker"); + Map<String, Object> kafkaConfig; + + if (instance.isAvailable()) { + kafkaConfig = (Map<String, Object>) instance.get(); + } else { + kafkaConfig = Collections.emptyMap(); + } + + // TODO: Return new RuntimeValue<>(quarkusKafkaClientFactory) as the KafkaClientFactory option should be autowired + // https://issues.apache.org/jira/browse/CAMEL-16500 + QuarkusKafkaClientFactory quarkusKafkaClientFactory = new QuarkusKafkaClientFactory(kafkaConfig); + component.setKafkaClientFactory(quarkusKafkaClientFactory); + return new RuntimeValue<>(component); + } +} diff --git a/extensions/kafka/runtime/src/main/java/org/apache/camel/quarkus/component/kafka/QuarkusKafkaClientFactory.java b/extensions/kafka/runtime/src/main/java/org/apache/camel/quarkus/component/kafka/QuarkusKafkaClientFactory.java new file mode 100644 index 0000000..edf4964 --- /dev/null +++ b/extensions/kafka/runtime/src/main/java/org/apache/camel/quarkus/component/kafka/QuarkusKafkaClientFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.kafka; + +import java.util.Map; +import java.util.Properties; + +import org.apache.camel.component.kafka.DefaultKafkaClientFactory; +import org.apache.camel.component.kafka.KafkaConfiguration; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; + +/** + * Custom {@link org.apache.camel.component.kafka.KafkaClientFactory} to enable Kafka configuration properties + * discovered by Quarkus to be merged with those configured from the Camel Kafka component and endpoint URI options. + */ +public class QuarkusKafkaClientFactory extends DefaultKafkaClientFactory { + + private final Map<String, Object> quarkusKafkaConfiguration; + + public QuarkusKafkaClientFactory(Map<String, Object> quarkusKafkaConfiguration) { + this.quarkusKafkaConfiguration = quarkusKafkaConfiguration; + } + + @Override + public KafkaProducer getProducer(Properties camelKafkaProperties) { + mergeConfiguration(camelKafkaProperties); + return super.getProducer(camelKafkaProperties); + } + + @Override + public KafkaConsumer getConsumer(Properties camelKafkaProperties) { + mergeConfiguration(camelKafkaProperties); + return super.getConsumer(camelKafkaProperties); + } + + @Override + public String getBrokers(KafkaConfiguration configuration) { + String brokers = (String) quarkusKafkaConfiguration.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); + return brokers != null ? brokers : super.getBrokers(configuration); + } + + /** + * Merges kafka configuration properties discovered by Quarkus with those provided via the + * component & endpoint URI options. + */ + private void mergeConfiguration(Properties camelKafkaProperties) { + if (quarkusKafkaConfiguration != null) { + for (Map.Entry<String, Object> entry : quarkusKafkaConfiguration.entrySet()) { + camelKafkaProperties.put(entry.getKey(), entry.getValue()); + } + } + } +} diff --git a/integration-tests/kafka-sasl/pom.xml b/integration-tests/kafka-sasl/pom.xml new file mode 100644 index 0000000..05ddd48 --- /dev/null +++ b/integration-tests/kafka-sasl/pom.xml @@ -0,0 +1,134 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-integration-tests</artifactId> + <version>1.9.0-SNAPSHOT</version> + </parent> + + <artifactId>camel-quarkus-integration-test-kafka-sasl</artifactId> + <name>Camel Quarkus :: Integration Tests :: Kafka SASL</name> + <description>Integration tests for Camel Quarkus Kafka SASL</description> + + <dependencies> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-kafka</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-log</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-resteasy</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-resteasy-jsonb</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-jackson</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-kubernetes-service-binding</artifactId> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>io.rest-assured</groupId> + <artifactId>rest-assured</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-integration-testcontainers-support</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>kafka</artifactId> + <scope>test</scope> + </dependency> + + <!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory --> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-kafka-deployment</artifactId> + <version>${project.version}</version> + <type>pom</type> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-log-deployment</artifactId> + <version>${project.version}</version> + <type>pom</type> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>native</id> + <activation> + <property> + <name>native</name> + </property> + </activation> + <properties> + <quarkus.package.type>native</quarkus.package.type> + </properties> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>integration-test</goal> + <goal>verify</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + +</project> diff --git a/integration-tests/kafka-sasl/src/main/java/org/apache/camel/quarkus/kafka/sasl/KafkaSaslResource.java b/integration-tests/kafka-sasl/src/main/java/org/apache/camel/quarkus/kafka/sasl/KafkaSaslResource.java new file mode 100644 index 0000000..0c01029 --- /dev/null +++ b/integration-tests/kafka-sasl/src/main/java/org/apache/camel/quarkus/kafka/sasl/KafkaSaslResource.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.kafka.sasl; + +import java.time.Duration; + +import javax.enterprise.context.ApplicationScoped; +import javax.json.Json; +import javax.json.JsonObject; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +@Path("/test") +@ApplicationScoped +public class KafkaSaslResource { + + @Path("/kafka/{topicName}") + @POST + @Produces(MediaType.APPLICATION_JSON) + public JsonObject post(@PathParam("topicName") String topicName, String message) throws Exception { + try (Producer<Integer, String> producer = KafkaSupport.createProducer()) { + RecordMetadata meta = producer.send(new ProducerRecord<>(topicName, 1, message)).get(); + + return Json.createObjectBuilder() + .add("topicName", meta.topic()) + .add("partition", meta.partition()) + .add("offset", meta.offset()) + .build(); + } + } + + @Path("/kafka/{topicName}") + @GET + @Produces(MediaType.APPLICATION_JSON) + public JsonObject get(@PathParam("topicName") String topicName) { + try (KafkaConsumer<Integer, String> consumer = KafkaSupport.createConsumer(topicName)) { + ConsumerRecord<Integer, String> record = consumer.poll(Duration.ofSeconds(60)).iterator().next(); + return Json.createObjectBuilder() + .add("topicName", record.topic()) + .add("partition", record.partition()) + .add("offset", record.offset()) + .add("key", record.key()) + .add("body", record.value()) + .build(); + } + } +} diff --git a/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java b/integration-tests/kafka-sasl/src/main/java/org/apache/camel/quarkus/kafka/sasl/KafkaSaslRoutes.java similarity index 63% copy from extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java copy to integration-tests/kafka-sasl/src/main/java/org/apache/camel/quarkus/kafka/sasl/KafkaSaslRoutes.java index a0a5359..8bd31f4 100644 --- a/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java +++ b/integration-tests/kafka-sasl/src/main/java/org/apache/camel/quarkus/kafka/sasl/KafkaSaslRoutes.java @@ -14,16 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.quarkus.component.kafka.deployment; +package org.apache.camel.quarkus.kafka.sasl; -import io.quarkus.deployment.annotations.BuildStep; -import io.quarkus.deployment.builditem.FeatureBuildItem; +import org.apache.camel.builder.RouteBuilder; -class KafkaProcessor { - private static final String FEATURE = "camel-kafka"; - - @BuildStep - FeatureBuildItem feature() { - return new FeatureBuildItem(FEATURE); +public class KafkaSaslRoutes extends RouteBuilder { + @Override + public void configure() throws Exception { + // Note: kafka component configuration is done via quarkus-kubernetes-service-binding. + // See configuration in src/test/resources/k8s-sb/kafka + from("kafka:inbound") + .to("log:kafka") + .to("kafka:outbound"); } } diff --git a/integration-tests/kafka-sasl/src/main/java/org/apache/camel/quarkus/kafka/sasl/KafkaSupport.java b/integration-tests/kafka-sasl/src/main/java/org/apache/camel/quarkus/kafka/sasl/KafkaSupport.java new file mode 100644 index 0000000..83bc191 --- /dev/null +++ b/integration-tests/kafka-sasl/src/main/java/org/apache/camel/quarkus/kafka/sasl/KafkaSupport.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.kafka.sasl; + +import java.util.Collections; +import java.util.Properties; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.config.ConfigProvider; + +public final class KafkaSupport { + + private KafkaSupport() { + } + + public static KafkaConsumer<Integer, String> createConsumer(String topicName) { + Properties props = new Properties(); + setConfigProperty(props, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); + setConfigProperty(props, SaslConfigs.SASL_MECHANISM); + setConfigProperty(props, SaslConfigs.SASL_JAAS_CONFIG); + setConfigProperty(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG); + + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singletonList(topicName)); + + return consumer; + } + + public static Producer<Integer, String> createProducer() { + Properties props = new Properties(); + setConfigProperty(props, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); + setConfigProperty(props, SaslConfigs.SASL_MECHANISM); + setConfigProperty(props, SaslConfigs.SASL_JAAS_CONFIG); + setConfigProperty(props, CommonClientConfigs.SECURITY_PROTOCOL_CONFIG); + + props.put(ProducerConfig.CLIENT_ID_CONFIG, "test-consumer"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + return new KafkaProducer<>(props); + } + + private static void setConfigProperty(Properties props, String key) { + Config config = ConfigProvider.getConfig(); + props.put(key, config.getValue("kafka." + key, String.class)); + } +} diff --git a/integration-tests/kafka-sasl/src/main/resources/application.properties b/integration-tests/kafka-sasl/src/main/resources/application.properties new file mode 100644 index 0000000..c7c9f6b --- /dev/null +++ b/integration-tests/kafka-sasl/src/main/resources/application.properties @@ -0,0 +1,18 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +quarkus.kubernetes-service-binding.root=${java.io.tmpdir}/k8s-sb diff --git a/integration-tests/kafka-sasl/src/test/java/org/apache/camel/quarkus/kafka/sasl/KafkaSaslBindingTest.java b/integration-tests/kafka-sasl/src/test/java/org/apache/camel/quarkus/kafka/sasl/KafkaSaslBindingTest.java new file mode 100644 index 0000000..c1e754b --- /dev/null +++ b/integration-tests/kafka-sasl/src/test/java/org/apache/camel/quarkus/kafka/sasl/KafkaSaslBindingTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.kafka.sasl; + +import java.util.UUID; + +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.restassured.RestAssured; +import io.restassured.path.json.JsonPath; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +@QuarkusTest +@QuarkusTestResource(KafkaSaslTestResource.class) +public class KafkaSaslBindingTest { + + @Test + void testKafkaBridge() { + String body = UUID.randomUUID().toString(); + + RestAssured.given() + .contentType("text/plain") + .body(body) + .post("/test/kafka/inbound") + .then() + .statusCode(200); + + JsonPath result = RestAssured.given() + .get("/test/kafka/outbound") + .then() + .statusCode(200) + .extract() + .body() + .jsonPath(); + + assertThat(result.getString("topicName")).isEqualTo("outbound"); + assertThat(result.getString("body")).isEqualTo(body); + } +} diff --git a/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java b/integration-tests/kafka-sasl/src/test/java/org/apache/camel/quarkus/kafka/sasl/KafkaSaslIT.java similarity index 69% copy from extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java copy to integration-tests/kafka-sasl/src/test/java/org/apache/camel/quarkus/kafka/sasl/KafkaSaslIT.java index a0a5359..6b25cb4 100644 --- a/extensions/kafka/deployment/src/main/java/org/apache/camel/quarkus/component/kafka/deployment/KafkaProcessor.java +++ b/integration-tests/kafka-sasl/src/test/java/org/apache/camel/quarkus/kafka/sasl/KafkaSaslIT.java @@ -14,16 +14,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.quarkus.component.kafka.deployment; +package org.apache.camel.quarkus.kafka.sasl; -import io.quarkus.deployment.annotations.BuildStep; -import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.test.junit.NativeImageTest; -class KafkaProcessor { - private static final String FEATURE = "camel-kafka"; - - @BuildStep - FeatureBuildItem feature() { - return new FeatureBuildItem(FEATURE); - } +@NativeImageTest +public class KafkaSaslIT extends KafkaSaslBindingTest { } diff --git a/integration-tests/kafka-sasl/src/test/java/org/apache/camel/quarkus/kafka/sasl/KafkaSaslTestResource.java b/integration-tests/kafka-sasl/src/test/java/org/apache/camel/quarkus/kafka/sasl/KafkaSaslTestResource.java new file mode 100644 index 0000000..c6410ab --- /dev/null +++ b/integration-tests/kafka-sasl/src/test/java/org/apache/camel/quarkus/kafka/sasl/KafkaSaslTestResource.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.kafka.sasl; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.Map; + +import com.github.dockerjava.api.command.InspectContainerResponse; +import io.quarkus.test.common.QuarkusTestResourceLifecycleManager; +import org.apache.kafka.clients.CommonClientConfigs; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.shaded.org.apache.commons.io.FileUtils; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +public class KafkaSaslTestResource implements QuarkusTestResourceLifecycleManager { + + private static final File TMP_DIR = Paths.get(System.getProperty("java.io.tmpdir"), "k8s-sb", "kafka").toFile(); + private SaslKafkaContainer container; + + @Override + public Map<String, String> start() { + // Set up the service binding directory + try { + TMP_DIR.mkdirs(); + + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + URL resource = classLoader.getResource("k8s-sb/kafka"); + File serviceBindings = new File(resource.getPath()); + + for (File serviceBinding : serviceBindings.listFiles()) { + URL serviceBindingResource = classLoader.getResource("k8s-sb/kafka/" + serviceBinding.getName()); + FileUtils.copyInputStreamToFile(serviceBindingResource.openStream(), + Paths.get(TMP_DIR.getPath(), serviceBinding.getName()).toFile()); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + + DockerImageName imageName = DockerImageName.parse("confluentinc/cp-kafka").withTag("5.4.3"); + container = new SaslKafkaContainer(imageName); + container.start(); + return Collections.singletonMap("kafka." + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + container.getBootstrapServers()); + } + + @Override + public void stop() { + if (this.container != null) { + try { + this.container.stop(); + FileUtils.deleteDirectory(TMP_DIR.getParentFile()); + } catch (Exception e) { + // Ignored + } + } + } + + // KafkaContainer does not support SASL OOTB so we need some customizations + static final class SaslKafkaContainer extends KafkaContainer { + + SaslKafkaContainer(final DockerImageName dockerImageName) { + super(dockerImageName); + + String protocolMap = "SASL_PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT"; + String listeners = "SASL_PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9092"; + + withEnv("KAFKA_OPTS", "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"); + withEnv("KAFKA_LISTENERS", listeners); + withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", protocolMap); + withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false"); + withEnv("KAFKA_SASL_ENABLED_MECHANISMS", "PLAIN"); + withEnv("ZOOKEEPER_SASL_ENABLED", "false"); + withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); + withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "PLAIN"); + withEmbeddedZookeeper().waitingFor(Wait.forListeningPort()); + } + + @Override + public String getBootstrapServers() { + return String.format("SASL_PLAINTEXT://%s:%s", getHost(), getMappedPort(KAFKA_PORT)); + } + + @Override + protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) { + super.containerIsStarting(containerInfo, reused); + copyFileToContainer( + MountableFile.forClasspathResource("config/kafka_server_jaas.conf"), + "/etc/kafka/kafka_server_jaas.conf"); + } + } +} diff --git a/integration-tests/kafka-sasl/src/test/resources/config/kafka_server_jaas.conf b/integration-tests/kafka-sasl/src/test/resources/config/kafka_server_jaas.conf new file mode 100644 index 0000000..3879db9 --- /dev/null +++ b/integration-tests/kafka-sasl/src/test/resources/config/kafka_server_jaas.conf @@ -0,0 +1,7 @@ +KafkaServer { + org.apache.kafka.common.security.plain.PlainLoginModule required + username="admin" + password="admin-secret" + user_admin="admin-secret" + user_alice="alice-secret"; +}; \ No newline at end of file diff --git a/integration-tests/kafka-sasl/src/test/resources/k8s-sb/kafka/password b/integration-tests/kafka-sasl/src/test/resources/k8s-sb/kafka/password new file mode 100644 index 0000000..f1f4c76 --- /dev/null +++ b/integration-tests/kafka-sasl/src/test/resources/k8s-sb/kafka/password @@ -0,0 +1 @@ +alice-secret \ No newline at end of file diff --git a/integration-tests/kafka-sasl/src/test/resources/k8s-sb/kafka/saslMechanism b/integration-tests/kafka-sasl/src/test/resources/k8s-sb/kafka/saslMechanism new file mode 100644 index 0000000..b547f4a --- /dev/null +++ b/integration-tests/kafka-sasl/src/test/resources/k8s-sb/kafka/saslMechanism @@ -0,0 +1 @@ +PLAIN \ No newline at end of file diff --git a/integration-tests/kafka-sasl/src/test/resources/k8s-sb/kafka/securityProtocol b/integration-tests/kafka-sasl/src/test/resources/k8s-sb/kafka/securityProtocol new file mode 100644 index 0000000..459a574 --- /dev/null +++ b/integration-tests/kafka-sasl/src/test/resources/k8s-sb/kafka/securityProtocol @@ -0,0 +1 @@ +SASL_PLAINTEXT \ No newline at end of file diff --git a/integration-tests/kafka-sasl/src/test/resources/k8s-sb/kafka/type b/integration-tests/kafka-sasl/src/test/resources/k8s-sb/kafka/type new file mode 100644 index 0000000..d2651b5 --- /dev/null +++ b/integration-tests/kafka-sasl/src/test/resources/k8s-sb/kafka/type @@ -0,0 +1 @@ +kafka \ No newline at end of file diff --git a/integration-tests/kafka-sasl/src/test/resources/k8s-sb/kafka/user b/integration-tests/kafka-sasl/src/test/resources/k8s-sb/kafka/user new file mode 100644 index 0000000..ca56b59 --- /dev/null +++ b/integration-tests/kafka-sasl/src/test/resources/k8s-sb/kafka/user @@ -0,0 +1 @@ +alice \ No newline at end of file diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index c55782b..1784c79 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -135,6 +135,7 @@ <module>jsonpath</module> <module>jta</module> <module>kafka</module> + <module>kafka-sasl</module> <module>kamelet</module> <module>kotlin</module> <module>kubernetes</module> diff --git a/pom.xml b/pom.xml index 47b92de..deb94f5 100644 --- a/pom.xml +++ b/pom.xml @@ -477,6 +477,7 @@ <exclude>**/generated/**</exclude> <exclude>.envrc</exclude> <exclude>**/.idea/**</exclude> + <exclude>**/k8s-sb/**</exclude> </excludes> <mapping> <groovy>SLASHSTAR_STYLE</groovy> diff --git a/tooling/scripts/test-categories.yaml b/tooling/scripts/test-categories.yaml index 1e56fef..ab8ce67 100644 --- a/tooling/scripts/test-categories.yaml +++ b/tooling/scripts/test-categories.yaml @@ -124,6 +124,7 @@ messaging-networking1: - activemq - amqp - kafka + - kafka-sasl - messaging - nats - splunk