This is an automated email from the ASF dual-hosted git repository.
fmariani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 978b6665b24 Adding confluent kafka instance type
978b6665b24 is described below
commit 978b6665b249c9f8071224bb4defbed4167254d3
Author: Salvatore Mongiardo <[email protected]>
AuthorDate: Thu Apr 17 10:56:23 2025 +0200
Adding confluent kafka instance type
---
.../apache/camel/catalog/test-infra/metadata.json | 9 +++
.../src/generated/resources/META-INF/metadata.json | 9 +++
.../test/infra/kafka/common/KafkaProperties.java | 1 +
.../infra/kafka/services/ConfluentContainer.java | 91 +++++++++++++++++++++
.../kafka/services/ConfluentInfraService.java | 93 ++++++++++++++++++++++
.../infra/kafka/services/KafkaServiceFactory.java | 6 ++
6 files changed, 209 insertions(+)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/test-infra/metadata.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/test-infra/metadata.json
index 0e030762fa2..6ae17303bd6 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/test-infra/metadata.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/test-infra/metadata.json
@@ -124,6 +124,15 @@
"groupId" : "org.apache.camel",
"artifactId" : "camel-test-infra-kafka",
"version" : "4.12.0-SNAPSHOT"
+}, {
+ "service" : "org.apache.camel.test.infra.kafka.services.KafkaInfraService",
+ "description" : "Apache Kafka, Distributed event streaming platform",
+ "implementation" :
"org.apache.camel.test.infra.kafka.services.ConfluentInfraService",
+ "alias" : [ "kafka" ],
+ "aliasImplementation" : [ "confluent" ],
+ "groupId" : "org.apache.camel",
+ "artifactId" : "camel-test-infra-kafka",
+ "version" : "4.12.0-SNAPSHOT"
}, {
"service" : "org.apache.camel.test.infra.nats.services.NatsInfraService",
"description" : "Messaging Platform NATS",
diff --git
a/test-infra/camel-test-infra-all/src/generated/resources/META-INF/metadata.json
b/test-infra/camel-test-infra-all/src/generated/resources/META-INF/metadata.json
index 0e030762fa2..6ae17303bd6 100644
---
a/test-infra/camel-test-infra-all/src/generated/resources/META-INF/metadata.json
+++
b/test-infra/camel-test-infra-all/src/generated/resources/META-INF/metadata.json
@@ -124,6 +124,15 @@
"groupId" : "org.apache.camel",
"artifactId" : "camel-test-infra-kafka",
"version" : "4.12.0-SNAPSHOT"
+}, {
+ "service" : "org.apache.camel.test.infra.kafka.services.KafkaInfraService",
+ "description" : "Apache Kafka, Distributed event streaming platform",
+ "implementation" :
"org.apache.camel.test.infra.kafka.services.ConfluentInfraService",
+ "alias" : [ "kafka" ],
+ "aliasImplementation" : [ "confluent" ],
+ "groupId" : "org.apache.camel",
+ "artifactId" : "camel-test-infra-kafka",
+ "version" : "4.12.0-SNAPSHOT"
}, {
"service" : "org.apache.camel.test.infra.nats.services.NatsInfraService",
"description" : "Messaging Platform NATS",
diff --git
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java
index 1b936895073..ce4de407b29 100644
---
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java
+++
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/common/KafkaProperties.java
@@ -20,6 +20,7 @@ package org.apache.camel.test.infra.kafka.common;
public final class KafkaProperties {
public static final String KAFKA_BOOTSTRAP_SERVERS =
"kafka.bootstrap.servers";
public static final String KAFKA_ZOOKEEPER_ADDRESS =
"kafka.zookeeper.address";
+ public static final String CONFLUENT_CONTAINER =
"confluent.container.image";
public static final String KAFKA_CONTAINER = "kafka.container";
public static final String KAFKA3_CONTAINER = "kafka3.container";
public static final String REDPANDA_CONTAINER = "redpanda.container.image";
diff --git
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentContainer.java
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentContainer.java
new file mode 100644
index 00000000000..42d60218c53
--- /dev/null
+++
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentContainer.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.test.infra.kafka.services;
+
+import java.util.UUID;
+
+import com.github.dockerjava.api.command.CreateContainerCmd;
+import org.apache.camel.test.infra.common.LocalPropertyResolver;
+import org.apache.camel.test.infra.kafka.common.KafkaProperties;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+public class ConfluentContainer extends GenericContainer<ConfluentContainer> {
+ static final String CONFLUENT_CONTAINER =
LocalPropertyResolver.getProperty(
+ ConfluentContainer.class,
+ KafkaProperties.CONFLUENT_CONTAINER);
+ private static final int KAFKA_PORT = 9092;
+
+ public ConfluentContainer(Network network, String name) {
+ this(network, name, CONFLUENT_CONTAINER);
+ }
+
+ public ConfluentContainer(Network network, String name, String
containerName) {
+ super(containerName);
+
+ withEnv("LOG_DIR", "/tmp/logs")
+ .withExposedPorts(KAFKA_PORT)
+ .withEnv("KAFKA_BROKER_ID", "1")
+ .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
+
"BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
+ .withEnv("KAFKA_ADVERTISED_LISTENERS",
+ String.format("PLAINTEXT://%s:9092,BROKER://%s:9093",
getHost(), getHost()))
+ .withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0")
+ .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
+ .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
+ .withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
+ .withEnv("KAFKA_PROCESS_ROLES", "broker,controller")
+ .withEnv("KAFKA_NODE_ID", "1")
+ .withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", "[email protected]:9094")
+ .withEnv("KAFKA_LISTENERS",
+
"PLAINTEXT://0.0.0.0:9092,BROKER://0.0.0.0:9093,CONTROLLER://0.0.0.0:9094")
+ .withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
+ .withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER")
+ .withEnv("KAFKA_LOG_DIRS", "/tmp/kraft-combined-logs")
+ .withEnv("KAFKA_REST_HOST_NAME", "rest-proxy")
+ .withEnv("KAFKA_REST_LISTENERS",
String.format("http://%s:9092", getHost()))
+ .withEnv("KAFKA_REST_BOOTSTRAP_SERVERS", "localhost:9092")
+ .withEnv("PATH",
"PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin")
+ .withEnv("container", "oci")
+ .withEnv("LANG", "C.UTF-8")
+ .withEnv("UB_CLASSPATH", "/usr/share/java/cp-base-lite/*")
+ .withEnv("KAFKA_ZOOKEEPER_CONNECT", "")
+ .withEnv("CLUSTER_ID",
UUID.randomUUID().toString().replace("-", "").substring(0, 22))
+ .withNetwork(network)
+ .withCreateContainerCmdModifier(createContainerCmd ->
setupContainer(name, createContainerCmd))
+ .withCommand("sh", "-c",
+ "/etc/confluent/docker/run")
+ .waitingFor(Wait.forLogMessage(".*Kafka Server started.*", 1));
+ }
+
+ private void setupContainer(String name, CreateContainerCmd
createContainerCmd) {
+ createContainerCmd.withHostName(name);
+ createContainerCmd.withName(name);
+ }
+
+ public int getKafkaPort() {
+ return getMappedPort(KAFKA_PORT);
+ }
+
+ @Override
+ public void start() {
+ addFixedExposedPort(KAFKA_PORT, KAFKA_PORT);
+ super.start();
+ }
+}
diff --git
a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentInfraService.java
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentInfraService.java
new file mode 100644
index 00000000000..42c79fc717b
--- /dev/null
+++
b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentInfraService.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.test.infra.kafka.services;
+
+import org.apache.camel.spi.annotations.InfraService;
+import org.apache.camel.test.infra.common.TestUtils;
+import org.apache.camel.test.infra.common.services.ContainerService;
+import org.apache.camel.test.infra.kafka.common.KafkaProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Network;
+
+@InfraService(service = KafkaInfraService.class,
+ description = "Apache Kafka, Distributed event streaming
platform",
+ serviceAlias = "kafka", serviceImplementationAlias = "confluent")
+public class ConfluentInfraService implements KafkaInfraService,
ContainerService<ConfluentContainer> {
+ private static final Logger LOG =
LoggerFactory.getLogger(ConfluentInfraService.class);
+
+ private final ConfluentContainer confluentContainer;
+
+ public ConfluentInfraService() {
+ this("confluent-" + TestUtils.randomWithRange(1, 100));
+ }
+
+ public ConfluentInfraService(String confluentInstanceName) {
+ Network network = Network.newNetwork();
+ confluentContainer = initConfluentContainer(network,
confluentInstanceName);
+ }
+
+ public ConfluentInfraService(ConfluentContainer confluentContainer) {
+ this.confluentContainer = confluentContainer;
+ }
+
+ protected ConfluentContainer initConfluentContainer(Network network,
String instanceName) {
+ return new ConfluentContainer(network, instanceName);
+ }
+
+ protected Integer getKafkaPort() {
+ return confluentContainer.getKafkaPort();
+ }
+
+ @Override
+ public String getBootstrapServers() {
+ return confluentContainer.getHost() + ":" + getKafkaPort();
+ }
+
+ @Override
+ public void registerProperties() {
+ System.setProperty(KafkaProperties.KAFKA_BOOTSTRAP_SERVERS,
getBootstrapServers());
+ }
+
+ @Override
+ public void initialize() {
+ confluentContainer.start();
+
+ registerProperties();
+ LOG.info("Kafka bootstrap server running at address {}",
getBootstrapServers());
+ }
+
+ private boolean stopped() {
+ return !confluentContainer.isRunning();
+ }
+
+ @Override
+ public void shutdown() {
+ try {
+ LOG.info("Stopping Kafka container");
+ confluentContainer.stop();
+ } finally {
+ TestUtils.waitFor(this::stopped);
+ }
+ }
+
+ @Override
+ public ConfluentContainer getContainer() {
+ return confluentContainer;
+ }
+}
diff --git
a/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/KafkaServiceFactory.java
b/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/KafkaServiceFactory.java
index 0ca9ac80075..fc0fa2a7368 100644
---
a/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/KafkaServiceFactory.java
+++
b/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/KafkaServiceFactory.java
@@ -60,6 +60,7 @@ public final class KafkaServiceFactory {
return
builder.addLocalMapping(ContainerLocalKafkaService::kafka3Container)
.addMapping("local-strimzi-container", StrimziService::new)
+ .addMapping("local-confluent-container", ConfluentService::new)
.addRemoteMapping(RemoteKafkaService::new)
.addMapping("local-kafka3-container",
ContainerLocalKafkaService::kafka3Container)
.addMapping("local-redpanda-container", RedpandaService::new)
@@ -80,6 +81,8 @@ public final class KafkaServiceFactory {
.addRemoteMapping(RemoteKafkaService::new)
.addMapping("local-kafka3-container",
() -> new
SingletonKafkaService(ContainerLocalKafkaService.kafka3Container(), "kafka3"))
+ .addMapping("local-confluent-container",
+ () -> new SingletonKafkaService(new
ConfluentService(), "confluent"))
.addMapping("local-strimzi-container",
() -> new SingletonKafkaService(new
StrimziService(), "strimzi"))
.addMapping("local-redpanda-container",
@@ -104,6 +107,9 @@ public final class KafkaServiceFactory {
}
}
+ public static class ConfluentService extends ConfluentInfraService
implements KafkaService {
+ }
+
public static class StrimziService extends StrimziInfraService implements
KafkaService {
}