This is an automated email from the ASF dual-hosted git repository.
jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push:
new 101356b329 Adds Opensearch tests on JVM mode
101356b329 is described below
commit 101356b3298858c23b9356de884f4f442b26b2bc
Author: souvik ghosh <[email protected]>
AuthorDate: Mon Sep 1 15:58:20 2025 +0530
Adds Opensearch tests on JVM mode
---
integration-tests-jvm/opensearch/pom.xml | 37 ++++
.../opensearch/it/OpensearchResource.java | 195 +++++++++++++++++++--
.../component/opensearch/it/OpensearchRoutes.java | 50 ++++++
.../component/opensearch/it/OpensearchTest.java | 143 ++++++++++++++-
.../opensearch/it/OpensearchTestResource.java | 69 ++++++++
pom.xml | 1 +
6 files changed, 477 insertions(+), 18 deletions(-)
diff --git a/integration-tests-jvm/opensearch/pom.xml
b/integration-tests-jvm/opensearch/pom.xml
index cd65ba855e..322f4193ce 100644
--- a/integration-tests-jvm/opensearch/pom.xml
+++ b/integration-tests-jvm/opensearch/pom.xml
@@ -39,6 +39,14 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-resteasy-jackson</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-direct</artifactId>
+ </dependency>
<!-- test dependencies -->
<dependency>
@@ -51,6 +59,22 @@
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-junit4-mock</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
@@ -63,6 +87,19 @@
</activation>
<dependencies>
<!-- The following dependencies guarantee that this module is
built after them. You can update them by running `mvn process-resources
-Pformat -N` from the source tree root directory -->
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-direct-deployment</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-opensearch-deployment</artifactId>
diff --git
a/integration-tests-jvm/opensearch/src/main/java/org/apache/camel/quarkus/component/opensearch/it/OpensearchResource.java
b/integration-tests-jvm/opensearch/src/main/java/org/apache/camel/quarkus/component/opensearch/it/OpensearchResource.java
index 55cbec987b..f8726e4235 100644
---
a/integration-tests-jvm/opensearch/src/main/java/org/apache/camel/quarkus/component/opensearch/it/OpensearchResource.java
+++
b/integration-tests-jvm/opensearch/src/main/java/org/apache/camel/quarkus/component/opensearch/it/OpensearchResource.java
@@ -16,15 +16,37 @@
*/
package org.apache.camel.quarkus.component.opensearch.it;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
+import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.GET;
+import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
-import jakarta.ws.rs.Produces;
-import jakarta.ws.rs.core.MediaType;
+import jakarta.ws.rs.PathParam;
+import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.Response;
-import org.apache.camel.CamelContext;
+import org.apache.camel.FluentProducerTemplate;
+import org.apache.camel.component.opensearch.OpensearchConstants;
import org.jboss.logging.Logger;
+import org.opensearch.client.json.JsonData;
+import org.opensearch.client.opensearch._types.FieldValue;
+import org.opensearch.client.opensearch.core.BulkRequest;
+import org.opensearch.client.opensearch.core.DeleteRequest;
+import org.opensearch.client.opensearch.core.GetRequest;
+import org.opensearch.client.opensearch.core.GetResponse;
+import org.opensearch.client.opensearch.core.MsearchRequest;
+import org.opensearch.client.opensearch.core.bulk.BulkResponseItem;
+import org.opensearch.client.opensearch.core.bulk.IndexOperation;
+import org.opensearch.client.opensearch.core.mget.MultiGetResponseItem;
+import org.opensearch.client.opensearch.core.msearch.MultiSearchResponseItem;
+import org.opensearch.client.opensearch.core.msearch.MultisearchBody;
+import org.opensearch.client.opensearch.core.msearch.MultisearchHeader;
+import org.opensearch.client.opensearch.core.msearch.RequestItem;
@Path("/opensearch")
@ApplicationScoped
@@ -32,19 +54,166 @@ public class OpensearchResource {
private static final Logger LOG =
Logger.getLogger(OpensearchResource.class);
- private static final String COMPONENT_OPENSEARCH = "opensearch";
@Inject
- CamelContext context;
+ FluentProducerTemplate fluentProducerTemplate;
+
+ @POST
+ @Path("/index/{index}/{id}")
+ public Response indexDoc(String body,
+ @PathParam("index") String index,
+ @PathParam("id") String id) {
+ Map<String, Object> headers = Map.of(
+ OpensearchConstants.PARAM_INDEX_ID, id,
+ OpensearchConstants.PARAM_INDEX_NAME, index);
+ String response = fluentProducerTemplate.to("direct:indexDoc")
+ .withBody(body)
+ .withHeaders(headers)
+ .request(String.class);
+
+ if (response.isBlank() || response.isEmpty()) {
+ return Response.status(404).build();
+ }
+
+ return Response.ok().entity(response).build();
+
+ }
+
+ @POST
+ @Path("/bulk/{index}")
+ public Response bulkIndex(List<Map<String, Object>> docs,
+ @PathParam("index") String index) {
+
+ BulkRequest.Builder br = new BulkRequest.Builder();
+ for (Map<String, Object> doc : docs) {
+ String id = (String) doc.get("id");
+ br.operations(op -> op
+ .index(IndexOperation.of(idx -> idx
+ .index(index)
+ .id(id)
+ .document(JsonData.of(doc)))));
+ }
+
+ BulkResponseItem[] bulkResponse =
fluentProducerTemplate.to("direct:bulkIndex")
+ .withBody(br)
+ .request(BulkResponseItem[].class);
+
+ List<String> insertedIds = Arrays.asList(bulkResponse)
+ .stream().map(BulkResponseItem::id)
+ .collect(Collectors.toList());
+
+ return Response.ok(insertedIds).build();
+
+ }
+
+ @GET
+ @Path("/get/{index}/{id}")
+ public Response getDoc(@PathParam("index") String index,
+ @PathParam("id") String id) {
+
+ GetRequest.Builder builder = new GetRequest.Builder();
+ builder.index(index).id(id);
+
+ GetResponse<?> response = fluentProducerTemplate.to("direct:getDoc")
+ .withBody(builder)
+ .request(GetResponse.class);
+
+ if (response.source() == null || !response.found()) {
+ return Response.status(404).build();
+ }
+
+ return Response.ok().entity(response.id()).build();
+ }
+
+ @POST
+ @Path("/multiget/{index}")
+ public Response multiGet(@PathParam("index") String index,
+ List<String> body) {
+
+ MultiGetResponseItem<?> responseItem[] =
fluentProducerTemplate.to("direct:multiget")
+ .withBody(body)
+ .withHeader(OpensearchConstants.PARAM_INDEX_NAME, index)
+ .request(MultiGetResponseItem[].class);
+
+ int totalFound = Arrays.asList(responseItem).stream().map(s ->
s.result().found())
+ .collect(Collectors.toList()).size();
+ return Response.ok(totalFound).build();
+
+ }
- @Path("/load/component/opensearch")
@GET
- @Produces(MediaType.TEXT_PLAIN)
- public Response loadComponentOpensearch() throws Exception {
- /* This is an autogenerated test */
- if (context.getComponent(COMPONENT_OPENSEARCH) != null) {
- return Response.ok().build();
+ @Path("/multisearch")
+ public Response multiSearch(@QueryParam("users") String user,
+ @QueryParam("orders") String order) {
+
+ MsearchRequest.Builder builder = null;
+
+ if (user != null & order != null) {
+ builder = new MsearchRequest.Builder().searches(
+ new RequestItem.Builder()
+ .header(new
MultisearchHeader.Builder().index("users").build())
+ .body(new MultisearchBody.Builder()
+ .query(b -> b.match(m -> m.field("name")
+ .query(FieldValue.of(user))))
+ .build())
+ .build(),
+ new RequestItem.Builder()
+ .header(new
MultisearchHeader.Builder().index("orders").build())
+ .body(new MultisearchBody.Builder()
+ .query(b -> b.match(m -> m.field("item")
+ .query(FieldValue.of(order))))
+ .build())
+ .build());
+
+ }
+
+ MultiSearchResponseItem<?>[] response =
fluentProducerTemplate.to("direct:multiSearch")
+ .withBody(builder)
+ .request(MultiSearchResponseItem[].class);
+
+ if (response.length > 0) {
+ int totalFound = 0;
+ for (MultiSearchResponseItem<?> item : response) {
+ if (!item.isFailure() && item.isResult() && item.result() !=
null) {
+ totalFound++;
+ }
+ }
+ return Response.ok(totalFound).build();
}
- LOG.warnf("Could not load [%s] from the Camel context",
COMPONENT_OPENSEARCH);
- return Response.status(500, COMPONENT_OPENSEARCH + " could not be
loaded from the Camel context").build();
+ return Response.ok().build();
+
+ }
+
+ @DELETE
+ @Path("/delete/{index}/{id}")
+ public Response deleteDoc(@PathParam("index") String index,
+ @PathParam("id") String id) {
+
+ DeleteRequest.Builder builder = new DeleteRequest.Builder();
+ builder.id(id);
+ builder.index(index);
+ String response = fluentProducerTemplate.to("direct:deleteDoc")
+ .withBody(builder)
+ .request(String.class);
+
+ return Response.ok(response).build();
}
+
+ @POST
+ @Path("/search/{index}")
+ public Response search(String body, @PathParam("index") String index) {
+ String response = fluentProducerTemplate.to("direct:search")
+ .withBody(body)
+ .request(String.class);
+ return Response.ok(response).build();
+ }
+
+ @POST
+ @Path("/scroll")
+ public Response scroll(String body) {
+ String response = fluentProducerTemplate.to("direct:search")
+ .withBody(body)
+ .request(String.class);
+ return Response.ok(response).build();
+ }
+
}
diff --git
a/integration-tests-jvm/opensearch/src/main/java/org/apache/camel/quarkus/component/opensearch/it/OpensearchRoutes.java
b/integration-tests-jvm/opensearch/src/main/java/org/apache/camel/quarkus/component/opensearch/it/OpensearchRoutes.java
new file mode 100644
index 0000000000..67abea7189
--- /dev/null
+++
b/integration-tests-jvm/opensearch/src/main/java/org/apache/camel/quarkus/component/opensearch/it/OpensearchRoutes.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.opensearch.it;
+
+import org.apache.camel.builder.RouteBuilder;
+
+public class OpensearchRoutes extends RouteBuilder {
+
+ @Override
+ public void configure() throws Exception {
+ from("direct:indexDoc")
+ .to("opensearch://opensearch?operation=Index");
+
+ from("direct:bulkIndex")
+ .to("opensearch://opensearch?operation=Bulk");
+
+ from("direct:getDoc")
+ .to("opensearch://opensearch?operation=GetById");
+
+ from("direct:multiget")
+ .to("opensearch://opensearch?operation=MultiGet");
+
+ from("direct:deleteDoc")
+ .to("opensearch://opensearch?operation=Delete");
+
+ from("direct:search")
+
.to("opensearch://opensearch?operation=Search&useScroll=true&scrollKeepAliveMs=30000");
+
+ from("direct:multiSearch")
+ .to("opensearch://opensearch?operation=MultiSearch");
+
+ from("direct:scrollContinue")
+
.to("opensearch://_search/scroll?operation=Search&useScroll=true");
+ }
+
+}
diff --git
a/integration-tests-jvm/opensearch/src/test/java/org/apache/camel/quarkus/component/opensearch/it/OpensearchTest.java
b/integration-tests-jvm/opensearch/src/test/java/org/apache/camel/quarkus/component/opensearch/it/OpensearchTest.java
index db3a8b225d..d758c19321 100644
---
a/integration-tests-jvm/opensearch/src/test/java/org/apache/camel/quarkus/component/opensearch/it/OpensearchTest.java
+++
b/integration-tests-jvm/opensearch/src/test/java/org/apache/camel/quarkus/component/opensearch/it/OpensearchTest.java
@@ -16,19 +16,152 @@
*/
package org.apache.camel.quarkus.component.opensearch.it;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
-import io.restassured.RestAssured;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static io.restassured.RestAssured.given;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
@QuarkusTest
+@QuarkusTestResource(OpensearchTestResource.class)
class OpensearchTest {
- @Test
- public void loadComponentOpensearch() {
- /* A simple autogenerated test */
- RestAssured.get("/opensearch/load/component/opensearch")
+ @ParameterizedTest
+ @MethodSource("docProvider")
+ void testIndexGetDeleteScroll(DocData doc) {
+ // ✅ Index
+ given()
+ .contentType("application/json")
+ .body(doc.json)
+ .post("/opensearch/index/{index}/{id}", doc.indexName, doc.id)
+ .then()
+ .statusCode(200)
+ .and()
+ .body(is(doc.id));
+
+ // ✅ Get
+ given()
+ .get("/opensearch/get/{index}/{id}", doc.indexName, doc.id)
+ .then()
+ .statusCode(200)
+ .body(containsString(doc.id));
+
+ // ✅ Search with scroll
+ String scrollResponse = given()
+ .contentType("application/json")
+ .body("{\"query\":{\"match_all\":{}},\"size\":1}")
+ .post("/opensearch/search/{index}", doc.indexName)
+ .then()
+ .statusCode(200)
+ .extract().asString();
+
+ String scrollId = extractScrollId(scrollResponse);
+ if (scrollId != null) {
+ given()
+ .contentType("application/json")
+ .body("{\"scroll\":\"1m\",\"scroll_id\":\"" + scrollId +
"\"}")
+ .post("/opensearch/scroll")
+ .then()
+ .statusCode(200);
+ }
+
+ // ✅ Delete
+ given()
+ .delete("/opensearch/delete/{index}/{id}", doc.indexName,
doc.id)
.then()
.statusCode(200);
}
+ @Test
+ void testBulkIndexMultiGetMultiSearch() {
+
+ // Bulk Insert
+ given()
+ .contentType("application/json")
+ .body(buildBulkDoc())
+ .post("/opensearch/bulk/users")
+ .then()
+ .statusCode(200)
+ .body("$", hasSize(4));
+
+ // Verify bulk doc using multiGet
+ List<String> ids = Arrays.asList("u1", "u2");
+
+ given()
+ .contentType("application/json")
+ .body(ids)
+ .post("/opensearch/multiget/users")
+ .then()
+ .statusCode(200)
+ .body(is("2"));
+
+ // Multi Search
+
+ String totalFound = given()
+ .contentType("application/json")
+ .get("/opensearch/multisearch?users=Alice&orders=Phone")
+ .then()
+ .statusCode(200)
+ .extract()
+ .body()
+ .asString();
+
+ totalFound.equals("2");
+
+ }
+
+ static Stream<DocData> docProvider() {
+ return Stream.of(
+ new DocData("users", "u1",
"{\"id\":\"u1\",\"name\":\"Alice\"}"),
+ new DocData("users", "u2", "{\"id\":\"u2\",\"name\":\"Bob\"}"),
+ new DocData("orders", "o1",
"{\"id\":\"o1\",\"item\":\"Laptop\"}"),
+ new DocData("orders", "o2",
"{\"id\":\"o2\",\"item\":\"Phone\"}"));
+ }
+
+ static List<Map<String, Object>> buildBulkDoc() {
+ return docProvider().map(s -> {
+ try {
+ return Map.of("index", s.indexName,
+ "id", s.id, "source", new
ObjectMapper().readValue(s.json, Map.class));
+ } catch (Exception e) {
+
+ }
+ return null;
+ })
+ .collect(Collectors.toList());
+ }
+
+ private String extractScrollId(String response) {
+ if (response.contains("\"_scroll_id\"")) {
+ int start = response.indexOf("\"_scroll_id\"") + 13;
+ int end = response.indexOf("\"", start + 1);
+ return response.substring(start + 1, end);
+ }
+ return null;
+ }
+
+ static class DocData {
+ String indexName;
+ String id;
+ String json;
+
+ DocData(String indexName, String id, String json) {
+ this.indexName = indexName;
+ this.id = id;
+ this.json = json;
+ }
+ }
+
}
diff --git
a/integration-tests-jvm/opensearch/src/test/java/org/apache/camel/quarkus/component/opensearch/it/OpensearchTestResource.java
b/integration-tests-jvm/opensearch/src/test/java/org/apache/camel/quarkus/component/opensearch/it/OpensearchTestResource.java
new file mode 100644
index 0000000000..18a7f400fd
--- /dev/null
+++
b/integration-tests-jvm/opensearch/src/test/java/org/apache/camel/quarkus/component/opensearch/it/OpensearchTestResource.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.opensearch.it;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+import org.eclipse.microprofile.config.ConfigProvider;
+import org.jboss.logging.Logger;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.utility.DockerImageName;
+
+public class OpensearchTestResource implements
QuarkusTestResourceLifecycleManager {
+
+ private static final Logger LOG =
Logger.getLogger(OpensearchTestResource.class);
+
+ private GenericContainer<?> container;
+
+ private static final String OPENSEARCH_IMAGE =
ConfigProvider.getConfig().getValue("opensearch.container.image",
+ String.class);
+ private static final int OPENSEARCH_PORT = 9200;
+
+ @SuppressWarnings("resource")
+ @Override
+ public Map<String, String> start() {
+ try {
+ container = new
GenericContainer<>(DockerImageName.parse(OPENSEARCH_IMAGE))
+ .withEnv("discovery.type", "single-node")
+ .withExposedPorts(OPENSEARCH_PORT)
+ .withEnv("OPENSEARCH_JAVA_OPTS", "-Xms512m -Xmx512m")
+ .withEnv("plugins.security.disabled", "true");
+ container.start();
+
+ String address = container.getHost() + ":" +
container.getMappedPort(OPENSEARCH_PORT);
+ Map<String, String> config = new HashMap<>();
+ config.put("camel.component.opensearch.host-addresses", address);
+ config.put("camel.component.opensearch.enable-sniffer", "false");
+
+ return config;
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (container != null) {
+ container.stop();
+
+ }
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 14beee0a6c..560864af4e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -261,6 +261,7 @@
<mariadb.container.image>mirror.gcr.io/mariadb:10.11</mariadb.container.image>
<mongodb.container.image>mirror.gcr.io/mongo:7.0</mongodb.container.image>
<nats.container.image>mirror.gcr.io/nats:2.10.18</nats.container.image>
+
<opensearch.container.image>mirror.gcr.io/opensearchproject/opensearch:2.9.0</opensearch.container.image>
<openssh-server.container.image>mirror.gcr.io/linuxserver/openssh-server:version-9.7_p1-r4</openssh-server.container.image>
<oracle-debezium.container.image>mirror.gcr.io/gvenzl/oracle-free:23-slim-faststart</oracle-debezium.container.image>
<pinecone.container.image>ghcr.io/pinecone-io/pinecone-local:v0.7.0</pinecone.container.image>