This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 307bcdf7a50 CAMEL-20593 use ASF docker image (#15926)
307bcdf7a50 is described below
commit 307bcdf7a50a779d02de3ef15849d3c543d199c1
Author: Jono Morris <[email protected]>
AuthorDate: Sat Oct 12 18:47:47 2024 +1300
CAMEL-20593 use ASF docker image (#15926)
---
...KafkaConsumerAutoInstResumeRouteStrategyIT.java | 1 +
.../KafkaConsumerIdempotentGroupIdIT.java | 30 ++++++++++++++++----
.../integration/KafkaConsumerIdempotentIT.java | 30 ++++++++++++++++----
...kaConsumerIdempotentWithCustomSerializerIT.java | 32 +++++++++++++++------
.../KafkaConsumerIdempotentWithProcessorIT.java | 33 +++++++++++++++++-----
.../kafka/integration/common/KafkaTestUtil.java | 27 ++++++++++++++++++
.../kafka/KafkaIdempotentRepositoryEagerIT.java | 11 +++++++-
.../kafka/KafkaIdempotentRepositoryNonEagerIT.java | 10 ++++++-
.../KafkaIdempotentRepositoryPersistenceIT.java | 15 ++++++++--
.../services/ContainerLocalAuthKafkaService.java | 4 +--
.../kafka/services/ContainerLocalKafkaService.java | 7 ++---
.../test/infra/kafka/services/container.properties | 2 +-
12 files changed, 161 insertions(+), 41 deletions(-)
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java
index ece0f903fe5..8c61cd65eb6 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAutoInstResumeRouteStrategyIT.java
@@ -53,6 +53,7 @@ public class KafkaConsumerAutoInstResumeRouteStrategyIT
extends BaseKafkaTestSup
@BeforeEach
public void before() {
Properties props = KafkaTestUtil.getDefaultProperties(service);
+ KafkaTestUtil.createTopic(service, TOPIC, 1);
KafkaProducer<Object, Object> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentGroupIdIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentGroupIdIT.java
index edd237cef27..316da6c8c51 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentGroupIdIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentGroupIdIT.java
@@ -16,14 +16,17 @@
*/
package org.apache.camel.component.kafka.integration;
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
import org.apache.camel.BindToRegistry;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
@@ -34,23 +37,38 @@ import static
org.apache.camel.component.kafka.serde.KafkaSerdeHelper.numericHea
@DisabledIfSystemProperty(named = "enable.kafka.consumer.idempotency.tests",
matches = "false")
public class KafkaConsumerIdempotentGroupIdIT extends
KafkaConsumerIdempotentTestSupport {
- public static final String TOPIC = "idempt";
-
+ private static final String TOPIC;
+ private static final String REPOSITORY_TOPIC;
private final int size = 200;
+ static {
+ UUID topicId = UUID.randomUUID();
+ TOPIC = "idempt_" + topicId;
+ REPOSITORY_TOPIC = "TEST_IDEMPOTENT_" + topicId;
+ }
+
+ @BeforeAll
+ public static void createRepositoryTopic() {
+ KafkaTestUtil.createTopic(service, REPOSITORY_TOPIC, 1);
+ }
+
+ @AfterAll
+ public static void removeRepositoryTopic() {
+
kafkaAdminClient.deleteTopics(Collections.singleton(REPOSITORY_TOPIC)).all();
+ }
+
@BindToRegistry("kafkaIdempotentRepository")
private final KafkaIdempotentRepository testIdempotent
- = new KafkaIdempotentRepository("TEST_IDEMPOTENT",
getBootstrapServers(), "test_1");
+ = new KafkaIdempotentRepository(REPOSITORY_TOPIC,
getBootstrapServers(), "test_1");
@BeforeEach
public void before() {
- kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC,
"TEST_IDEMPOTENT")).all();
doSend(size, TOPIC);
}
@AfterEach
public void after() {
- kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC,
"TEST_IDEMPOTENT")).all();
+ kafkaAdminClient.deleteTopics(Collections.singleton(TOPIC)).all();
}
protected RouteBuilder createRouteBuilder() {
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
index 242e2488755..3e979fa52c3 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
@@ -16,14 +16,17 @@
*/
package org.apache.camel.component.kafka.integration;
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
import org.apache.camel.BindToRegistry;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Tag;
@@ -39,23 +42,38 @@ import static
org.apache.camel.component.kafka.serde.KafkaSerdeHelper.numericHea
@Tags({ @Tag("idempotent") })
public class KafkaConsumerIdempotentIT extends
KafkaConsumerIdempotentTestSupport {
- public static final String TOPIC = "idempt";
-
+ private static final String TOPIC;
+ private static final String REPOSITORY_TOPIC;
private final int size = 200;
+ static {
+ UUID topicId = UUID.randomUUID();
+ TOPIC = "idempt_" + topicId;
+ REPOSITORY_TOPIC = "TEST_IDEMPOTENT_" + topicId;
+ }
+
+ @BeforeAll
+ public static void createRepositoryTopic() {
+ KafkaTestUtil.createTopic(service, REPOSITORY_TOPIC, 1);
+ }
+
+ @AfterAll
+ public static void removeRepositoryTopic() {
+
kafkaAdminClient.deleteTopics(Collections.singleton(REPOSITORY_TOPIC)).all();
+ }
+
@BindToRegistry("kafkaIdempotentRepository")
private final KafkaIdempotentRepository testIdempotent
- = new KafkaIdempotentRepository("TEST_IDEMPOTENT",
getBootstrapServers());
+ = new KafkaIdempotentRepository(REPOSITORY_TOPIC,
getBootstrapServers());
@BeforeEach
public void before() {
- kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC,
"TEST_IDEMPOTENT")).all();
doSend(size, TOPIC);
}
@AfterEach
public void after() {
- kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC,
"TEST_IDEMPOTENT")).all();
+ kafkaAdminClient.deleteTopics(Collections.singleton(TOPIC)).all();
}
protected RouteBuilder createRouteBuilder() {
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
index e95d6f8066a..c504b41ca08 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
@@ -16,36 +16,50 @@
*/
package org.apache.camel.component.kafka.integration;
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
import org.apache.camel.BindToRegistry;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.*;
public class KafkaConsumerIdempotentWithCustomSerializerIT extends
KafkaConsumerIdempotentTestSupport {
- public static final String TOPIC = "idempt2";
-
+ private static final String TOPIC;
+ private static final String REPOSITORY_TOPIC;
private final int size = 200;
+ static {
+ UUID topicId = UUID.randomUUID();
+ TOPIC = "idempt_" + topicId;
+ REPOSITORY_TOPIC = "TEST_IDEMPOTENT_" + topicId;
+ }
+
+ @BeforeAll
+ public static void createRepositoryTopic() {
+ KafkaTestUtil.createTopic(service, REPOSITORY_TOPIC, 1);
+ }
+
+ @AfterAll
+ public static void removeRepositoryTopic() {
+
kafkaAdminClient.deleteTopics(Collections.singleton(REPOSITORY_TOPIC)).all();
+ }
+
@BindToRegistry("kafkaIdempotentRepository")
private final KafkaIdempotentRepository kafkaIdempotentRepository
- = new KafkaIdempotentRepository("TEST_IDEMPOTENT",
getBootstrapServers());
+ = new KafkaIdempotentRepository(REPOSITORY_TOPIC,
getBootstrapServers());
@BeforeEach
public void before() {
- kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC,
"TEST_IDEMPOTENT")).all();
doSend(size, TOPIC);
}
@AfterEach
public void after() {
- kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC,
"TEST_IDEMPOTENT")).all();
+ kafkaAdminClient.deleteTopics(Collections.singleton(TOPIC)).all();
}
protected RouteBuilder createRouteBuilder() {
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
index 1c89b7ec7cf..a0bb4bf6ea3 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
@@ -17,35 +17,54 @@
package org.apache.camel.component.kafka.integration;
import java.math.BigInteger;
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
import org.apache.camel.BindToRegistry;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class KafkaConsumerIdempotentWithProcessorIT extends
KafkaConsumerIdempotentTestSupport {
- public static final String TOPIC = "testidemp3";
-
+ private static final String TOPIC;
+ private static final String REPOSITORY_TOPIC;
private final int size = 200;
+
+ static {
+ UUID topicId = UUID.randomUUID();
+ TOPIC = "idempt_" + topicId;
+ REPOSITORY_TOPIC = "TEST_IDEMPOTENT_" + topicId;
+ }
+
+ @BeforeAll
+ public static void createRepositoryTopic() {
+ KafkaTestUtil.createTopic(service, REPOSITORY_TOPIC, 1);
+ }
+
+ @AfterAll
+ public static void removeRepositoryTopic() {
+
kafkaAdminClient.deleteTopics(Collections.singleton(REPOSITORY_TOPIC)).all();
+ }
+
@BindToRegistry("kafkaIdempotentRepository")
private final KafkaIdempotentRepository kafkaIdempotentRepository
- = new KafkaIdempotentRepository("TEST_IDEMPOTENT",
getBootstrapServers());
+ = new KafkaIdempotentRepository(REPOSITORY_TOPIC,
getBootstrapServers());
@BeforeEach
public void before() {
- kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC,
"TEST_IDEMPOTENT")).all();
doSend(size, TOPIC);
}
@AfterEach
public void after() {
- // clean all test topics
- kafkaAdminClient.deleteTopics(Arrays.asList(TOPIC,
"TEST_IDEMPOTENT")).all();
+ // clean test topic
+ kafkaAdminClient.deleteTopics(Collections.singleton(TOPIC)).all();
}
@Override
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java
index 8d4d15e2361..d173d8d0c49 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/common/KafkaTestUtil.java
@@ -17,7 +17,10 @@
package org.apache.camel.component.kafka.integration.common;
+import java.util.Collections;
+import java.util.List;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.component.kafka.KafkaComponent;
@@ -26,10 +29,18 @@ import
org.apache.camel.test.infra.kafka.services.KafkaService;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.KafkaAdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
public final class KafkaTestUtil {
public static final String MOCK_RESULT = "mock:result";
public static final String MOCK_RESULT_BAR = "mock:resultBar";
@@ -78,4 +89,20 @@ public final class KafkaTestUtil {
kafka.getConfiguration().setBrokers(bootstrapServers);
context.addComponent("kafka", kafka);
}
+
+ public static void createTopic(KafkaService service, String topic, int
numPartitions) {
+ AdminClient kafkaAdminClient = createAdminClient(service);
+ NewTopic testTopic = new NewTopic(topic, numPartitions,
CreateTopicsRequest.NO_REPLICATION_FACTOR);
+ kafkaAdminClient.createTopics(Collections.singleton(testTopic));
+ KafkaFuture<TopicDescription> tdFuture
+ =
kafkaAdminClient.describeTopics(Collections.singletonList(topic)).topicNameValues().get(topic);
+
+ try {
+ TopicDescription td = tdFuture.get(5L, TimeUnit.SECONDS);
+ List<TopicPartitionInfo> pi = td.partitions();
+ assertEquals(numPartitions, pi.size());
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
}
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
index b08ef00b21f..d758afda1fb 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerIT.java
@@ -22,7 +22,9 @@ import org.apache.camel.BindToRegistry;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -32,9 +34,16 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
*/
public class KafkaIdempotentRepositoryEagerIT extends SimpleIdempotentTest {
+ private static final String REPOSITORY_TOPIC = "TEST_EAGER_" +
UUID.randomUUID();
+
+ @BeforeAll
+ public static void createRepositoryTopic() {
+ KafkaTestUtil.createTopic(service, REPOSITORY_TOPIC, 1);
+ }
+
@BindToRegistry("kafkaIdempotentRepositoryEager")
private final KafkaIdempotentRepository idempotentRepository
- = new KafkaIdempotentRepository("TEST_EAGER_" + UUID.randomUUID(),
service.getBootstrapServers());
+ = new KafkaIdempotentRepository(REPOSITORY_TOPIC,
service.getBootstrapServers());
@Override
protected RouteBuilder createRouteBuilder() {
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
index 8c47892e7e1..985848008a4 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryNonEagerIT.java
@@ -27,6 +27,7 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.infra.core.annotations.ContextFixture;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
@@ -42,9 +43,16 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class KafkaIdempotentRepositoryNonEagerIT extends SimpleIdempotentTest {
+ private static final String REPOSITORY_TOPIC = "TEST_NON_EAGER_" +
UUID.randomUUID();
+
+ @BeforeAll
+ public static void createRepositoryTopic() {
+ KafkaTestUtil.createTopic(service, REPOSITORY_TOPIC, 1);
+ }
+
@BindToRegistry("kafkaIdempotentRepositoryNonEager")
private final KafkaIdempotentRepository kafkaIdempotentRepository
- = new KafkaIdempotentRepository("TEST_NON_EAGER_" +
UUID.randomUUID(), service.getBootstrapServers());
+ = new KafkaIdempotentRepository(REPOSITORY_TOPIC,
service.getBootstrapServers());
@ContextFixture
public void configureKafka(CamelContext context) {
diff --git
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
index 1de12059224..83dca938bfc 100644
---
a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
+++
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryPersistenceIT.java
@@ -16,7 +16,8 @@
*/
package org.apache.camel.processor.idempotent.kafka;
-import java.util.Arrays;
+import java.util.Collections;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@@ -25,9 +26,11 @@ import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.integration.BaseKafkaTestSupport;
+import org.apache.camel.component.kafka.integration.common.KafkaTestUtil;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.infra.core.annotations.ContextFixture;
import org.apache.camel.test.infra.core.api.ConfigurableContext;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
@@ -55,16 +58,22 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class KafkaIdempotentRepositoryPersistenceIT extends
BaseKafkaTestSupport implements ConfigurableContext {
+ private static String REPOSITORY_TOPIC = "TEST_PERSISTENCE_" +
UUID.randomUUID();
private KafkaIdempotentRepository kafkaIdempotentRepository;
+ @BeforeAll
+ public static void createRepositoryTopic() {
+ KafkaTestUtil.createTopic(service, REPOSITORY_TOPIC, 1);
+ }
+
void clearTopics() {
- kafkaAdminClient.deleteTopics(Arrays.asList("TEST_PERSISTENCE")).all();
+
kafkaAdminClient.deleteTopics(Collections.singleton(REPOSITORY_TOPIC)).all();
}
@Override
@ContextFixture
public void configureContext(CamelContext context) {
- kafkaIdempotentRepository = new
KafkaIdempotentRepository("TEST_PERSISTENCE", getBootstrapServers());
+ kafkaIdempotentRepository = new
KafkaIdempotentRepository(REPOSITORY_TOPIC, getBootstrapServers());
context.getRegistry().bind("kafkaIdempotentRepositoryPersistence",
kafkaIdempotentRepository);
}
diff --git
a/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/ContainerLocalAuthKafkaService.java
b/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/ContainerLocalAuthKafkaService.java
index 1d878169cb1..e0934710c80 100644
---
a/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/ContainerLocalAuthKafkaService.java
+++
b/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/ContainerLocalAuthKafkaService.java
@@ -21,7 +21,7 @@ import
org.apache.camel.test.infra.common.services.ContainerService;
import org.apache.camel.test.infra.kafka.common.KafkaProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
import org.testcontainers.utility.MountableFile;
@@ -36,8 +36,6 @@ public class ContainerLocalAuthKafkaService implements
KafkaService, ContainerSe
ContainerLocalKafkaService.KAFKA3_IMAGE_NAME))
.asCompatibleSubstituteFor(ContainerLocalKafkaService.KAFKA3_IMAGE_NAME));
- withEmbeddedZookeeper();
-
final MountableFile mountableFile =
MountableFile.forClasspathResource(jaasConfigFile);
LOG.debug("Using mountable file at: {}",
mountableFile.getFilesystemPath());
withCopyFileToContainer(mountableFile, "/tmp/kafka-jaas.config")
diff --git
a/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/ContainerLocalKafkaService.java
b/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/ContainerLocalKafkaService.java
index 5e958377de6..4450498288f 100644
---
a/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/ContainerLocalKafkaService.java
+++
b/test-infra/camel-test-infra-kafka/src/test/java/org/apache/camel/test/infra/kafka/services/ContainerLocalKafkaService.java
@@ -22,7 +22,7 @@ import
org.apache.camel.test.infra.common.services.ContainerService;
import org.apache.camel.test.infra.kafka.common.KafkaProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.kafka.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
public class ContainerLocalKafkaService implements KafkaService,
ContainerService<KafkaContainer> {
@@ -42,8 +42,8 @@ public class ContainerLocalKafkaService implements
KafkaService, ContainerServic
}
protected KafkaContainer initContainer() {
- return new
KafkaContainer(DockerImageName.parse(System.getProperty(KafkaProperties.KAFKA_CONTAINER,
KAFKA3_IMAGE_NAME)))
- .withEmbeddedZookeeper();
+ return new KafkaContainer(
+
DockerImageName.parse(System.getProperty(KafkaProperties.KAFKA_CONTAINER,
KAFKA3_IMAGE_NAME)));
}
public String getBootstrapServers() {
@@ -79,7 +79,6 @@ public class ContainerLocalKafkaService implements
KafkaService, ContainerServic
= new KafkaContainer(
DockerImageName.parse(System.getProperty(KafkaProperties.KAFKA_CONTAINER,
KAFKA3_IMAGE_NAME))
.asCompatibleSubstituteFor(ContainerLocalKafkaService.KAFKA3_IMAGE_NAME));
- container = container.withEmbeddedZookeeper();
return new ContainerLocalKafkaService(container);
}
diff --git
a/test-infra/camel-test-infra-kafka/src/test/resources/org/apache/camel/test/infra/kafka/services/container.properties
b/test-infra/camel-test-infra-kafka/src/test/resources/org/apache/camel/test/infra/kafka/services/container.properties
index baf466772b4..d456dddc934 100644
---
a/test-infra/camel-test-infra-kafka/src/test/resources/org/apache/camel/test/infra/kafka/services/container.properties
+++
b/test-infra/camel-test-infra-kafka/src/test/resources/org/apache/camel/test/infra/kafka/services/container.properties
@@ -14,6 +14,6 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-kafka3.container=confluentinc/cp-kafka:7.4.5
+kafka3.container=apache/kafka:3.8.0
redpanda.container.image=redpandadata/redpanda:v24.1.16
strimzi.container.image=quay.io/strimzi/kafka:latest-kafka-3.7.0