This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch 23121 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 800f380ec36b98f69853df696036fe35c824b42e Author: Andrea Cosentino <[email protected]> AuthorDate: Tue Mar 3 10:56:23 2026 +0100 CAMEL-23121 - Kafka tests are failing on Jenkins CI since upgrade to Kafka 4.2 Signed-off-by: Andrea Cosentino <[email protected]> --- .../src/test/resources/log4j2.properties | 3 +++ .../infra/kafka/services/ConfluentContainer.java | 30 +++++++++++++++++++--- .../infra/kafka/services/StrimziContainer.java | 27 ++++++++++++++++--- 3 files changed, 53 insertions(+), 7 deletions(-) diff --git a/components/camel-kafka/src/test/resources/log4j2.properties b/components/camel-kafka/src/test/resources/log4j2.properties index f3afe12f0fa0..452b67149cc4 100644 --- a/components/camel-kafka/src/test/resources/log4j2.properties +++ b/components/camel-kafka/src/test/resources/log4j2.properties @@ -27,3 +27,6 @@ appender.stdout.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n rootLogger.level = INFO rootLogger.appenderRef.out.ref = out + +logger.kafka.name = org.apache.kafka +logger.kafka.level = WARN diff --git a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentContainer.java b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentContainer.java index 95aa14e095fc..37da65ecad66 100644 --- a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentContainer.java +++ b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/ConfluentContainer.java @@ -17,11 +17,12 @@ package org.apache.camel.test.infra.kafka.services; +import java.io.IOException; +import java.net.ServerSocket; import java.util.UUID; import com.github.dockerjava.api.command.CreateContainerCmd; import org.apache.camel.test.infra.common.LocalPropertyResolver; -import org.apache.camel.test.infra.common.services.ContainerEnvironmentUtil; import org.apache.camel.test.infra.kafka.common.KafkaProperties; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; @@ -45,8 +46,7 @@ public class ConfluentContainer extends GenericContainer<ConfluentContainer> { .withEnv("KAFKA_BROKER_ID", "1") .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT") - .withEnv("KAFKA_ADVERTISED_LISTENERS", - String.format("PLAINTEXT://%s:9092,BROKER://%s:9093", getHost(), getHost())) + // KAFKA_ADVERTISED_LISTENERS is set dynamically in start() with the correct mapped port .withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0") .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1") .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1") @@ -86,7 +86,29 @@ public class ConfluentContainer extends GenericContainer<ConfluentContainer> { @Override public void start() { - ContainerEnvironmentUtil.configurePort(this, true, KAFKA_PORT); + int hostPort = resolveHostPort(); + withEnv("KAFKA_ADVERTISED_LISTENERS", + String.format("PLAINTEXT://%s:%d,BROKER://localhost:9093", getHost(), hostPort)); super.start(); } + + private int resolveHostPort() { + String suffix = ":" + KAFKA_PORT; + for (String binding : getPortBindings()) { + if (binding.endsWith(suffix)) { + return Integer.parseInt(binding.substring(0, binding.indexOf(':'))); + } + } + int port = findFreePort(); + addFixedExposedPort(port, KAFKA_PORT); + return port; + } + + private static int findFreePort() { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } catch (IOException e) { + throw new RuntimeException("Failed to find a free port", e); + } + } } diff --git a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/StrimziContainer.java b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/StrimziContainer.java index d1a0ebff7db4..6575d873b1fa 100644 --- a/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/StrimziContainer.java +++ b/test-infra/camel-test-infra-kafka/src/main/java/org/apache/camel/test/infra/kafka/services/StrimziContainer.java @@ -17,11 +17,12 @@ package org.apache.camel.test.infra.kafka.services; +import java.io.IOException; +import java.net.ServerSocket; import java.util.UUID; import com.github.dockerjava.api.command.CreateContainerCmd; import org.apache.camel.test.infra.common.LocalPropertyResolver; -import org.apache.camel.test.infra.common.services.ContainerEnvironmentUtil; import org.apache.camel.test.infra.kafka.common.KafkaProperties; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; @@ -47,7 +48,6 @@ public class StrimziContainer extends GenericContainer<StrimziContainer> { .withEnv("KAFKA_NODE_ID", "1") .withEnv("KAFKA_PROCESS_ROLES", "broker,controller") .withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093") - .withEnv("KAFKA_ADVERTISED_LISTENERS", String.format("PLAINTEXT://%s:9092", getHost())) .withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER") .withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT") .withEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:9093") @@ -84,7 +84,28 @@ public class StrimziContainer extends GenericContainer<StrimziContainer> { @Override public void start() { - ContainerEnvironmentUtil.configurePort(this, true, KAFKA_PORT); + int hostPort = resolveHostPort(); + withEnv("KAFKA_ADVERTISED_LISTENERS", String.format("PLAINTEXT://%s:%d", getHost(), hostPort)); super.start(); } + + private int resolveHostPort() { + String suffix = ":" + KAFKA_PORT; + for (String binding : getPortBindings()) { + if (binding.endsWith(suffix)) { + return Integer.parseInt(binding.substring(0, binding.indexOf(':'))); + } + } + int port = findFreePort(); + addFixedExposedPort(port, KAFKA_PORT); + return port; + } + + private static int findFreePort() { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } catch (IOException e) { + throw new RuntimeException("Failed to find a free port", e); + } + } }
