This is an automated email from the ASF dual-hosted git repository.
ppalaga pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push:
new 003a1b8 Improve Kafka test coverage Fixes #2627
003a1b8 is described below
commit 003a1b87c640ad0b42c5504fd97b59ae741c3964
Author: Zineb Bendhiba <[email protected]>
AuthorDate: Wed Sep 8 15:43:39 2021 +0200
Improve Kafka test coverage
Fixes #2627
---
integration-tests/kafka/pom.xml | 22 +++++
.../component/kafka/CamelKafkaResource.java | 72 ++++++++++++++
.../quarkus/component/kafka/CamelKafkaRoutes.java | 48 +++++++++
.../component/kafka/CounterRoutePolicy.java | 36 +++++++
.../component/kafka/CustomHeaderDeserializer.java | 39 ++++++++
.../component/kafka/model/KafkaMessage.java | 49 ++++++++++
.../camel/quarkus/component/kafka/model/Price.java | 46 +++++++++
.../quarkus/component/kafka/it/CamelKafkaTest.java | 108 +++++++++++++++++++++
8 files changed, 420 insertions(+)
diff --git a/integration-tests/kafka/pom.xml b/integration-tests/kafka/pom.xml
index 5a49da4..0940faa 100644
--- a/integration-tests/kafka/pom.xml
+++ b/integration-tests/kafka/pom.xml
@@ -53,6 +53,10 @@
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-seda</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-integration-tests-support-kafka</artifactId>
</dependency>
<dependency>
@@ -75,6 +79,11 @@
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
<!-- The following dependencies guarantee that this module is built
after them. You can update them by running `mvn process-resources -Pformat -N`
from the source tree root directory -->
<dependency>
@@ -129,6 +138,19 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-seda-deployment</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<profiles>
diff --git
a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaResource.java
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaResource.java
index ed46943..752bcca 100644
---
a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaResource.java
+++
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaResource.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.quarkus.component.kafka;
+import java.math.BigInteger;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
@@ -39,16 +40,22 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.camel.CamelContext;
+import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.kafka.KafkaClientFactory;
+import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.quarkus.component.kafka.model.KafkaMessage;
+import org.apache.camel.quarkus.component.kafka.model.Price;
+import org.apache.camel.spi.RouteController;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.internals.RecordHeader;
@Path("/kafka")
@ApplicationScoped
@@ -68,6 +75,9 @@ public class CamelKafkaResource {
@Inject
ProducerTemplate producerTemplate;
+ @Inject
+ ConsumerTemplate consumerTemplate;
+
@Path("/custom/client/factory/missing")
@GET
@Produces(MediaType.TEXT_PLAIN)
@@ -127,4 +137,66 @@ public class CamelKafkaResource {
.map(m -> m.getBody(String.class))
.collect(Collectors.toList());
}
+
+ @Path("/foo/{action}")
+ @POST
+ public Response modifyFooConsumerState(@PathParam("action") String action)
throws Exception {
+ RouteController controller = context.getRouteController();
+ if (action.equals("start")) {
+ controller.startRoute("foo");
+ } else if (action.equals("stop")) {
+ controller.stopRoute("foo");
+ } else {
+ throw new IllegalArgumentException("Unknown action: " + action);
+ }
+ return Response.ok().build();
+ }
+
+ @Path("/seda/{queue}")
+ @GET
+ public String getSedaMessage(@PathParam("queue") String queueName) {
+ return consumerTemplate.receiveBody(String.format("seda:%s",
queueName), 10000, String.class);
+ }
+
+ @Path("price/{key}")
+ @POST
+ public Response postPrice(@PathParam("key") Integer key, Double price) {
+ String routeURI =
"kafka:test-serializer?autoOffsetReset=earliest&keySerializer=org.apache.kafka.common.serialization.IntegerSerializer"
+ +
+
"&valueSerializer=org.apache.kafka.common.serialization.DoubleSerializer";
+ producerTemplate.sendBodyAndHeader(routeURI, price,
KafkaConstants.KEY, key);
+ return Response.ok().build();
+ }
+
+ @Path("price")
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public Price getPrice() {
+ Exchange exchange = consumerTemplate.receive("seda:serializer", 10000);
+ Integer key = exchange.getMessage().getHeader(KafkaConstants.KEY,
Integer.class);
+ Double price = exchange.getMessage().getBody(Double.class);
+ return new Price(key, price);
+ }
+
+ @Path("propagate/{id}")
+ @POST
+ public Response postMessageWithHeader(@PathParam("id") Integer id, String
message) {
+ try (Producer<Integer, String> producer = new
KafkaProducer<>(producerProperties)) {
+ ProducerRecord data = new ProducerRecord<>("test-propagation", id,
message);
+ data.headers().add(new RecordHeader("id",
BigInteger.valueOf(id).toByteArray()));
+ producer.send(data);
+ }
+ return Response.ok().build();
+ }
+
+ @Path("propagate")
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public KafkaMessage getKafkaMessage() {
+ Exchange exchange = consumerTemplate.receive("seda:propagation",
10000);
+ String id = exchange.getMessage().getHeader("id", String.class);
+ String message = exchange.getMessage().getBody(String.class);
+ return new KafkaMessage(id, message);
+ }
+
}
diff --git
a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRoutes.java
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRoutes.java
index 65e752e..85b2dfa 100644
---
a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRoutes.java
+++
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CamelKafkaRoutes.java
@@ -21,10 +21,21 @@ import javax.enterprise.inject.Produces;
import javax.inject.Named;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.KafkaManualCommit;
import org.apache.camel.processor.idempotent.kafka.KafkaIdempotentRepository;
import org.eclipse.microprofile.config.inject.ConfigProperty;
public class CamelKafkaRoutes extends RouteBuilder {
+
+ private final static String KAFKA_CONSUMER_MANUAL_COMMIT =
"kafka:manual-commit-topic"
+ + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
+ + "&allowManualCommit=true&autoOffsetReset=earliest";
+
+ private final static String SEDA_FOO = "seda:foo";
+ private final static String SEDA_SERIALIZER = "seda:serializer";
+ private final static String SEDA_HEADER_PROPAGATION = "seda:propagation";
+
@ConfigProperty(name = "camel.component.kafka.brokers")
String brokers;
@@ -35,6 +46,13 @@ public class CamelKafkaRoutes extends RouteBuilder {
return new KafkaIdempotentRepository("idempotent-topic", brokers);
}
+ @Produces
+ @ApplicationScoped
+ @Named("customHeaderDeserializer")
+ CustomHeaderDeserializer customHeaderDeserializer() {
+ return new CustomHeaderDeserializer();
+ }
+
@Override
public void configure() throws Exception {
from("kafka:inbound?autoOffsetReset=earliest")
@@ -46,5 +64,35 @@ public class CamelKafkaRoutes extends RouteBuilder {
.messageIdRepositoryRef("kafkaIdempotentRepository")
.to("mock:idempotent-results")
.end();
+
+ CounterRoutePolicy counterRoutePolicy = new CounterRoutePolicy();
+
+ // Kafka consumer that use Manual commit
+ // it manually commits only once every 2 messages received, so that we
could test redelivery of uncommitted messages
+ from(KAFKA_CONSUMER_MANUAL_COMMIT)
+ .routeId("foo")
+ .routePolicy(counterRoutePolicy)
+ .to(SEDA_FOO)
+ .process(e -> {
+ int counter = counterRoutePolicy.getCounter();
+ if (counter % 2 != 0) {
+ KafkaManualCommit manual =
e.getMessage().getHeader(KafkaConstants.MANUAL_COMMIT,
+ KafkaManualCommit.class);
+ manual.commitSync();
+ }
+ });
+
+ // By default, keyDeserializer & valueDeserializer ==
org.apache.kafka.common.serialization.StringDeserializer
+ // and valueSerializer & keySerializer ==
org.apache.kafka.common.serialization.StringSerializer
+ // the idea here is to test setting different kinds of Deserializers
+ from("kafka:test-serializer?autoOffsetReset=earliest" +
+
"&keyDeserializer=org.apache.kafka.common.serialization.IntegerDeserializer" +
+
"&valueDeserializer=org.apache.kafka.common.serialization.DoubleDeserializer")
+ .to(SEDA_SERIALIZER);
+
+ // Header Propagation using CustomHeaderDeserialize
+
from("kafka:test-propagation?headerDeserializer=#customHeaderDeserializer")
+ .to(SEDA_HEADER_PROPAGATION);
+
}
}
diff --git
a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CounterRoutePolicy.java
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CounterRoutePolicy.java
new file mode 100644
index 0000000..3d37b65
--- /dev/null
+++
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CounterRoutePolicy.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.kafka;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Route;
+import org.apache.camel.support.RoutePolicySupport;
+
+public class CounterRoutePolicy extends RoutePolicySupport {
+ private AtomicInteger counter = new AtomicInteger();
+
+ @Override
+ public void onExchangeBegin(Route route, Exchange exchange) {
+ exchange.setProperty("counter", counter.incrementAndGet());
+ }
+
+ public int getCounter() {
+ return counter.get();
+ }
+}
diff --git
a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CustomHeaderDeserializer.java
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CustomHeaderDeserializer.java
new file mode 100644
index 0000000..21993f4
--- /dev/null
+++
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/CustomHeaderDeserializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.kafka;
+
+import java.math.BigInteger;
+
+import io.quarkus.runtime.annotations.RegisterForReflection;
+import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
+
+@RegisterForReflection
+public class CustomHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
+
+ public CustomHeaderDeserializer() {
+ }
+
+ @Override
+ public Object deserialize(String key, byte[] value) {
+ if (key.equals("id")) {
+ BigInteger bi = new BigInteger(value);
+ return String.valueOf(bi.longValue());
+ } else {
+ return super.deserialize(key, value);
+ }
+ }
+}
diff --git
a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/KafkaMessage.java
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/KafkaMessage.java
new file mode 100644
index 0000000..78faebe
--- /dev/null
+++
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/KafkaMessage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.kafka.model;
+
+import io.quarkus.runtime.annotations.RegisterForReflection;
+
+@RegisterForReflection
+public class KafkaMessage {
+ String id;
+ String message;
+
+ public KafkaMessage() {
+ }
+
+ public KafkaMessage(String id, String message) {
+ this.id = id;
+ this.message = message;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+}
diff --git
a/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/Price.java
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/Price.java
new file mode 100644
index 0000000..9e9a5be
--- /dev/null
+++
b/integration-tests/kafka/src/main/java/org/apache/camel/quarkus/component/kafka/model/Price.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.kafka.model;
+
+import io.quarkus.runtime.annotations.RegisterForReflection;
+
+@RegisterForReflection
+public class Price {
+ Integer key;
+ Double price;
+
+ public Price(Integer key, Double price) {
+ this.key = key;
+ this.price = price;
+ }
+
+ public Integer getKey() {
+ return key;
+ }
+
+ public void setKey(Integer key) {
+ this.key = key;
+ }
+
+ public Double getPrice() {
+ return price;
+ }
+
+ public void setPrice(Double price) {
+ this.price = price;
+ }
+}
diff --git
a/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaTest.java
b/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaTest.java
index 6e0fdaa..627e5c5 100644
---
a/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaTest.java
+++
b/integration-tests/kafka/src/test/java/org/apache/camel/quarkus/component/kafka/it/CamelKafkaTest.java
@@ -18,6 +18,7 @@ package org.apache.camel.quarkus.component.kafka.it;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
@@ -25,10 +26,12 @@ import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import io.restassured.path.json.JsonPath;
import org.apache.camel.quarkus.test.support.kafka.KafkaTestResource;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import static io.restassured.RestAssured.given;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -86,4 +89,109 @@ public class CamelKafkaTest {
.statusCode(200)
.body(is("true"));
}
+
+ @Test
+ void testManualCommit() {
+ String body1 = UUID.randomUUID().toString();
+
+ // test consuming first message with manual auto-commit
+ // send message that should be consumed by route with routeId = foo
+ given()
+ .contentType("text/plain")
+ .body(body1)
+ .post("/kafka/manual-commit-topic")
+ .then()
+ .statusCode(200);
+
+ // make sure the message has been consumed
+ given()
+ .contentType("text/plain")
+ .body(body1)
+ .get("/kafka/seda/foo")
+ .then()
+ .body(equalTo(body1));
+
+ String body2 = UUID.randomUUID().toString();
+
+ given()
+ .contentType("text/plain")
+ .body(body2)
+ .post("/kafka/manual-commit-topic")
+ .then()
+ .statusCode(200);
+
+ Awaitility.await().pollInterval(50, TimeUnit.MILLISECONDS).atMost(30,
TimeUnit.SECONDS).until(() -> {
+
+ // make sure the message has been consumed
+ String result = given()
+ .contentType("text/plain")
+ .get("/kafka/seda/foo")
+ .asString();
+ return body2.equals(result);
+ });
+
+ // stop foo route
+ given()
+ .contentType("text/plain")
+ .body(body1)
+ .post("/kafka/foo/stop")
+ .then()
+ .statusCode(200);
+
+ // start again the foo route
+ given()
+ .contentType("text/plain")
+ .body(body1)
+ .post("/kafka/foo/start")
+ .then()
+ .statusCode(200);
+
+ // Make sure the second message is redelivered
+ Awaitility.await().pollInterval(100, TimeUnit.MILLISECONDS).atMost(30,
TimeUnit.SECONDS).until(() -> {
+
+ // make sure the message has been consumed
+ String result = given()
+ .contentType("text/plain")
+ .get("/kafka/seda/foo")
+ .asString();
+ return body2.equals(result);
+ });
+ }
+
+ @Test
+ void testSerializers() {
+ given()
+ .contentType("text/json")
+ .body(95.59F)
+ .post("/kafka/price/1")
+ .then()
+ .statusCode(200);
+
+ // make sure the message has been consumed
+ given()
+ .contentType("text/json")
+ .get("/kafka/price")
+ .then()
+ .body("key", equalTo(1))
+ .body("price", equalTo(95.59F));
+ }
+
+ @Test
+ void testHeaderPropagation() throws InterruptedException {
+ given()
+ .contentType(ContentType.JSON)
+ .accept(ContentType.JSON)
+ .body("hello world")
+ .post("/kafka/propagate/5")
+ .then()
+ .statusCode(200);
+
+ // make sure the message has been consumed, and that the id put in the
header has been propagated
+ given()
+ .contentType("text/json")
+ .get("/kafka/propagate")
+ .then()
+ .body("id", equalTo("5"))
+ .body("message", equalTo("hello world"));
+ }
}