This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ad75d9ead Add SQS batch consumer, JMS-like selector, KMS encryption 
and Quarkus client tests
9ad75d9ead is described below

commit 9ad75d9ead3ac1829fac833ecd4284c4875f72e3
Author: JinyuChen97 <[email protected]>
AuthorDate: Mon Mar 23 13:53:51 2026 +0000

    Add SQS batch consumer, JMS-like selector, KMS encryption and Quarkus 
client tests
    
    Fixes #2777
---
 integration-test-groups/aws2/aws2-sqs/pom.xml      |   5 +
 .../component/aws2/sqs/it/Aws2SqsResource.java     |  82 ++++++++++++++++
 .../aws2/sqs/it/BatchConsumerMessageCollector.java |  40 ++++++++
 .../aws2/sqs/it/BatchConsumerRouteBuilder.java     |  38 ++++++++
 .../aws2/sqs/it/SelectorMessageCollector.java      |  40 ++++++++
 .../aws2/sqs/it/SelectorRouteBuilder.java          |  57 +++++++++++
 .../quarkus/component/aws2/sqs/it/Aws2SqsTest.java | 104 +++++++++++++++++++++
 .../aws2/sqs/it/Aws2SqsTestEnvCustomizer.java      |  37 ++++++++
 8 files changed, 403 insertions(+)

diff --git a/integration-test-groups/aws2/aws2-sqs/pom.xml 
b/integration-test-groups/aws2/aws2-sqs/pom.xml
index 9d0af4b254..c443d66294 100644
--- a/integration-test-groups/aws2/aws2-sqs/pom.xml
+++ b/integration-test-groups/aws2/aws2-sqs/pom.xml
@@ -70,6 +70,11 @@
             <artifactId>awaitility</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>kms</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <profiles>
diff --git 
a/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsResource.java
 
b/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsResource.java
index 6bd106b8b9..f4cb6cb2cb 100644
--- 
a/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsResource.java
+++ 
b/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsResource.java
@@ -20,6 +20,7 @@ import java.net.URI;
 import java.net.URLDecoder;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.Optional;
 
 import jakarta.enterprise.context.ApplicationScoped;
 import jakarta.inject.Inject;
@@ -30,6 +31,7 @@ import jakarta.ws.rs.POST;
 import jakarta.ws.rs.Path;
 import jakarta.ws.rs.PathParam;
 import jakarta.ws.rs.Produces;
+import jakarta.ws.rs.QueryParam;
 import jakarta.ws.rs.core.MediaType;
 import jakarta.ws.rs.core.Response;
 import org.apache.camel.ConsumerTemplate;
@@ -47,12 +49,24 @@ public class Aws2SqsResource extends BaseAws2Resource {
     @ConfigProperty(name = "aws-sqs.queue-name")
     String queueName;
 
+    @ConfigProperty(name = "aws-sqs.batch-consumer-name")
+    String batchConsumerQueueName;
+
+    @ConfigProperty(name = "aws-sqs.kms-key-id")
+    Optional<String> kmsKeyId;
+
     @Inject
     ProducerTemplate producerTemplate;
 
     @Inject
     ConsumerTemplate consumerTemplate;
 
+    @Inject
+    BatchConsumerMessageCollector batchConsumerMessageCollector;
+
+    @Inject
+    SelectorMessageCollector selectorMessageCollector;
+
     public Aws2SqsResource() {
         super("sqs");
     }
@@ -169,6 +183,74 @@ public class Aws2SqsResource extends BaseAws2Resource {
                         String.class);
     }
 
+    @Path("batch-consumer/send")
+    @POST
+    @Consumes(MediaType.TEXT_PLAIN)
+    public Response sendToBatchConsumerQueue(String message) {
+        producerTemplate.sendBody(componentUri(batchConsumerQueueName), 
message);
+        return Response.ok().build();
+    }
+
+    @Path("batch-consumer/messages")
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    public List<String> getBatchConsumerMessages() {
+        return batchConsumerMessageCollector.getMessages();
+    }
+
+    @Path("batch-consumer/messages")
+    @DELETE
+    public Response clearBatchConsumerMessages() {
+        batchConsumerMessageCollector.clear();
+        return Response.ok().build();
+    }
+
+    @Path("kms/send/{queueName}")
+    @POST
+    @Consumes(MediaType.TEXT_PLAIN)
+    public Response sendToKmsQueue(@PathParam("queueName") String 
kmsQueueName, String message) {
+        String kmsId = kmsKeyId.orElseThrow(() -> new 
IllegalStateException("aws-sqs.kms-key-id not configured"));
+        String uri = 
String.format("aws2-sqs://%s?autoCreateQueue=true&serverSideEncryptionEnabled=true&kmsMasterKeyId=%s",
+                kmsQueueName, kmsId);
+        producerTemplate.sendBody(uri, message);
+        return Response.ok().build();
+    }
+
+    @Path("kms/receive/{queueName}")
+    @GET
+    @Produces(MediaType.TEXT_PLAIN)
+    public String receiveFromKmsQueue(@PathParam("queueName") String 
kmsQueueName) {
+        String kmsId = kmsKeyId.orElseThrow(() -> new 
IllegalStateException("aws-sqs.kms-key-id not configured"));
+        String uri = String.format(
+                
"aws2-sqs://%s?autoCreateQueue=true&serverSideEncryptionEnabled=true&kmsMasterKeyId=%s&deleteAfterRead=true",
+                kmsQueueName, kmsId);
+        return consumerTemplate.receiveBody(uri, 10000, String.class);
+    }
+
+    @Path("selector/send/{queueName}")
+    @POST
+    @Consumes(MediaType.TEXT_PLAIN)
+    public Response sendWithSelectorAttribute(@PathParam("queueName") String 
targetQueueName,
+            @QueryParam("filterType") String filterType, String message) {
+        producerTemplate.sendBodyAndHeader(componentUri(targetQueueName), 
message,
+                SelectorRouteBuilder.FILTER_ATTRIBUTE_NAME, filterType);
+        return Response.ok().build();
+    }
+
+    @Path("selector/messages")
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    public List<String> getSelectorMessages() {
+        return selectorMessageCollector.getMessages();
+    }
+
+    @Path("selector/messages")
+    @DELETE
+    public Response clearSelectorMessages() {
+        selectorMessageCollector.clear();
+        return Response.ok().build();
+    }
+
     private String componentUri() {
         return componentUri(queueName);
     }
diff --git 
a/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/BatchConsumerMessageCollector.java
 
b/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/BatchConsumerMessageCollector.java
new file mode 100644
index 0000000000..947ed6ffe5
--- /dev/null
+++ 
b/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/BatchConsumerMessageCollector.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.aws2.sqs.it;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+@ApplicationScoped
+public class BatchConsumerMessageCollector {
+
+    private final List<String> messages = new CopyOnWriteArrayList<>();
+
+    public void collect(String message) {
+        messages.add(message);
+    }
+
+    public List<String> getMessages() {
+        return List.copyOf(messages);
+    }
+
+    public void clear() {
+        messages.clear();
+    }
+}
diff --git 
a/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/BatchConsumerRouteBuilder.java
 
b/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/BatchConsumerRouteBuilder.java
new file mode 100644
index 0000000000..9c4803aa1e
--- /dev/null
+++ 
b/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/BatchConsumerRouteBuilder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.aws2.sqs.it;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import org.apache.camel.builder.RouteBuilder;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+
+@ApplicationScoped
+public class BatchConsumerRouteBuilder extends RouteBuilder {
+
+    @ConfigProperty(name = "aws-sqs.batch-consumer-name")
+    String batchConsumerQueueName;
+
+    @Inject
+    BatchConsumerMessageCollector collector;
+
+    @Override
+    public void configure() {
+        from("aws2-sqs://" + batchConsumerQueueName + 
"?maxMessagesPerPoll=5&deleteAfterRead=true")
+                .process(exchange -> 
collector.collect(exchange.getIn().getBody(String.class)));
+    }
+}
diff --git 
a/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/SelectorMessageCollector.java
 
b/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/SelectorMessageCollector.java
new file mode 100644
index 0000000000..a691777337
--- /dev/null
+++ 
b/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/SelectorMessageCollector.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.aws2.sqs.it;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+@ApplicationScoped
+public class SelectorMessageCollector {
+
+    private final List<String> messages = new CopyOnWriteArrayList<>();
+
+    public void collect(String message) {
+        messages.add(message);
+    }
+
+    public List<String> getMessages() {
+        return List.copyOf(messages);
+    }
+
+    public void clear() {
+        messages.clear();
+    }
+}
diff --git 
a/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/SelectorRouteBuilder.java
 
b/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/SelectorRouteBuilder.java
new file mode 100644
index 0000000000..04f2597754
--- /dev/null
+++ 
b/integration-test-groups/aws2/aws2-sqs/src/main/java/org/apache/camel/quarkus/component/aws2/sqs/it/SelectorRouteBuilder.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.aws2.sqs.it;
+
+import java.util.Map;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.aws2.sqs.Sqs2Constants;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
+
+@ApplicationScoped
+public class SelectorRouteBuilder extends RouteBuilder {
+
+    static final String FILTER_ATTRIBUTE_NAME = "filter-type";
+    static final String FILTER_ATTRIBUTE_SELECTED_VALUE = "selected";
+
+    @ConfigProperty(name = "aws-sqs.selector-name")
+    String selectorQueueName;
+
+    @Inject
+    SelectorMessageCollector collector;
+
+    @Override
+    public void configure() {
+        from("aws2-sqs://" + selectorQueueName
+                + 
"?messageAttributeNames=All&deleteAfterRead=true&deleteIfFiltered=false&defaultVisibilityTimeout=0")
+                .filter(exchange -> {
+                    Map<?, ?> attrs = 
exchange.getIn().getHeader(Sqs2Constants.MESSAGE_ATTRIBUTES, Map.class);
+                    if (attrs == null) {
+                        return false;
+                    }
+                    Object attrObj = attrs.get(FILTER_ATTRIBUTE_NAME);
+                    if (!(attrObj instanceof MessageAttributeValue attrValue)) 
{
+                        return false;
+                    }
+                    return 
FILTER_ATTRIBUTE_SELECTED_VALUE.equals(attrValue.stringValue());
+                })
+                .process(exchange -> 
collector.collect(exchange.getIn().getBody(String.class)));
+    }
+}
diff --git 
a/integration-test-groups/aws2/aws2-sqs/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsTest.java
 
b/integration-test-groups/aws2/aws2-sqs/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsTest.java
index 004d8fc9e8..0a52cae001 100644
--- 
a/integration-test-groups/aws2/aws2-sqs/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsTest.java
+++ 
b/integration-test-groups/aws2/aws2-sqs/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsTest.java
@@ -23,6 +23,7 @@ import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Locale;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
@@ -36,11 +37,13 @@ import io.restassured.response.Response;
 import org.apache.camel.quarkus.test.support.aws2.Aws2LocalStack;
 import org.apache.camel.quarkus.test.support.aws2.Aws2TestResource;
 import org.apache.camel.quarkus.test.support.aws2.BaseAWs2TestSupport;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.awaitility.Awaitility;
 import org.eclipse.microprofile.config.ConfigProvider;
 import org.jboss.logging.Logger;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Assumptions;
 import org.junit.jupiter.api.Test;
 
 import static org.hamcrest.Matchers.anyOf;
@@ -260,6 +263,107 @@ class Aws2SqsTest extends BaseAWs2TestSupport {
                 .asString());
     }
 
+    @Test
+    void sqsBatchConsumer() {
+        // clean previously collected messages
+        
RestAssured.delete("/aws2-sqs/batch-consumer/messages").then().statusCode(200);
+
+        final List<String> messages = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            String msg = "batch-consumer-" + 
UUID.randomUUID().toString().replace("-", "");
+            messages.add(msg);
+            RestAssured.given()
+                    .contentType(ContentType.TEXT)
+                    .body(msg)
+                    .post("/aws2-sqs/batch-consumer/send")
+                    .then()
+                    .statusCode(200);
+        }
+
+        Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, 
TimeUnit.SECONDS).until(() -> {
+            List<?> received = 
RestAssured.get("/aws2-sqs/batch-consumer/messages")
+                    .then().statusCode(200).extract().body().as(List.class);
+            return received.size() >= messages.size();
+        });
+
+        List<?> received = RestAssured.get("/aws2-sqs/batch-consumer/messages")
+                .then().statusCode(200).extract().body().as(List.class);
+        Assertions.assertEquals(messages.size(), received.size());
+        Assertions.assertTrue(received.containsAll(messages));
+    }
+
+    @Test
+    void sqsKmsEncryption() {
+        Assumptions.assumeTrue(localStack, "KMS test only runs on LocalStack");
+
+        final String kmsQueueName = "camel-quarkus-kms-"
+                + 
RandomStringUtils.secure().nextAlphanumeric(10).toLowerCase(Locale.ROOT);
+        final String msg = "kms-msg-" + 
UUID.randomUUID().toString().replace("-", "");
+
+        try {
+            RestAssured.given()
+                    .contentType(ContentType.TEXT)
+                    .body(msg)
+                    .post("/aws2-sqs/kms/send/" + kmsQueueName)
+                    .then()
+                    .statusCode(200);
+
+            Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(60, 
TimeUnit.SECONDS).until(() -> {
+                ExtractableResponse<Response> resp = 
RestAssured.get("/aws2-sqs/kms/receive/" + kmsQueueName)
+                        .then().extract();
+                return resp.statusCode() == 200 && 
msg.equals(resp.body().asString());
+            });
+        } finally {
+            deleteQueue(kmsQueueName);
+        }
+    }
+
+    @Test
+    void sqsJmsLikeSelector() {
+        final String selectorQueueName = 
ConfigProvider.getConfig().getValue("aws-sqs.selector-name", String.class);
+
+        // clean previously collected messages
+        
RestAssured.delete("/aws2-sqs/selector/messages").then().statusCode(200);
+        purgeQueue(selectorQueueName);
+
+        final String selectedMsg = "selected-" + 
UUID.randomUUID().toString().replace("-", "");
+        final String rejectedMsg = "rejected-" + 
UUID.randomUUID().toString().replace("-", "");
+
+        // send message that matches the filter (filter-type=selected)
+        RestAssured.given()
+                .contentType(ContentType.TEXT)
+                .body(selectedMsg)
+                .queryParam("filterType", 
SelectorRouteBuilder.FILTER_ATTRIBUTE_SELECTED_VALUE)
+                .post("/aws2-sqs/selector/send/" + selectorQueueName)
+                .then()
+                .statusCode(200);
+
+        // send message that does not match the filter (filter-type=rejected)
+        RestAssured.given()
+                .contentType(ContentType.TEXT)
+                .body(rejectedMsg)
+                .queryParam("filterType", "rejected")
+                .post("/aws2-sqs/selector/send/" + selectorQueueName)
+                .then()
+                .statusCode(200);
+
+        // wait for the selected message to be consumed
+        Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(60, 
TimeUnit.SECONDS).until(() -> {
+            List<?> collected = RestAssured.get("/aws2-sqs/selector/messages")
+                    .then().statusCode(200).extract().body().as(List.class);
+            return collected.contains(selectedMsg);
+        });
+
+        // verify only the selected message was collected
+        List<?> collected = RestAssured.get("/aws2-sqs/selector/messages")
+                .then().statusCode(200).extract().body().as(List.class);
+        Assertions.assertTrue(collected.contains(selectedMsg), "Selected 
message should have been collected");
+        Assertions.assertFalse(collected.contains(rejectedMsg), "Rejected 
message should not have been collected");
+
+        // purge rejected messages remaining in queue
+        purgeQueue(selectorQueueName);
+    }
+
     @Override
     public void testMethodForDefaultCredentialsProvider() {
         listQueues();
diff --git 
a/integration-test-groups/aws2/aws2-sqs/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsTestEnvCustomizer.java
 
b/integration-test-groups/aws2/aws2-sqs/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsTestEnvCustomizer.java
index 48e92beae2..f4aa714a06 100644
--- 
a/integration-test-groups/aws2/aws2-sqs/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsTestEnvCustomizer.java
+++ 
b/integration-test-groups/aws2/aws2-sqs/src/test/java/org/apache/camel/quarkus/component/aws2/sqs/it/Aws2SqsTestEnvCustomizer.java
@@ -22,6 +22,8 @@ import 
org.apache.camel.quarkus.test.support.aws2.Aws2TestEnvContext;
 import org.apache.camel.quarkus.test.support.aws2.Aws2TestEnvCustomizer;
 import org.apache.camel.quarkus.test.support.aws2.Service;
 import org.apache.commons.lang3.RandomStringUtils;
+import software.amazon.awssdk.services.kms.KmsClient;
+import software.amazon.awssdk.services.kms.model.CreateKeyRequest;
 import software.amazon.awssdk.services.sqs.SqsClient;
 import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
 import software.amazon.awssdk.services.sqs.model.DeleteQueueRequest;
@@ -32,6 +34,11 @@ public class Aws2SqsTestEnvCustomizer implements 
Aws2TestEnvCustomizer {
 
     @Override
     public Service[] localstackServices() {
+        return new Service[] { Service.SQS, Service.KMS };
+    }
+
+    @Override
+    public Service[] exportCredentialsForLocalstackServices() {
         return new Service[] { Service.SQS };
     }
 
@@ -50,6 +57,14 @@ public class Aws2SqsTestEnvCustomizer implements 
Aws2TestEnvCustomizer {
                 + 
RandomStringUtils.secure().nextAlphanumeric(49).toLowerCase(Locale.ROOT);
         envContext.property("aws-sqs.delayed-name", delayedQueueName);
 
+        final String batchConsumerQueueName = "camel-quarkus-batch-"
+                + 
RandomStringUtils.secure().nextAlphanumeric(49).toLowerCase(Locale.ROOT);
+        envContext.property("aws-sqs.batch-consumer-name", 
batchConsumerQueueName);
+
+        final String selectorQueueName = "camel-quarkus-selector-"
+                + 
RandomStringUtils.secure().nextAlphanumeric(49).toLowerCase(Locale.ROOT);
+        envContext.property("aws-sqs.selector-name", selectorQueueName);
+
         final SqsClient sqsClient = envContext.client(Service.SQS, 
SqsClient::builder);
         {
             final String queueUrl = sqsClient.createQueue(
@@ -70,10 +85,32 @@ public class Aws2SqsTestEnvCustomizer implements 
Aws2TestEnvCustomizer {
                             .build())
                     .queueUrl();
 
+            final String batchConsumerUrl = sqsClient.createQueue(
+                    CreateQueueRequest.builder()
+                            .queueName(batchConsumerQueueName)
+                            .build())
+                    .queueUrl();
+
+            final String selectorUrl = sqsClient.createQueue(
+                    CreateQueueRequest.builder()
+                            .queueName(selectorQueueName)
+                            .build())
+                    .queueUrl();
+
+            if (envContext.isLocalStack()) {
+                final KmsClient kmsClient = envContext.client(Service.KMS, 
KmsClient::builder);
+                final String kmsKeyId = kmsClient
+                        
.createKey(CreateKeyRequest.builder().description("camel-quarkus-sqs-test").build())
+                        .keyMetadata().keyId();
+                envContext.property("aws-sqs.kms-key-id", kmsKeyId);
+            }
+
             envContext.closeable(() -> {
                 
sqsClient.deleteQueue(DeleteQueueRequest.builder().queueUrl(queueUrl).build());
                 
sqsClient.deleteQueue(DeleteQueueRequest.builder().queueUrl(failingUrl).build());
                 
sqsClient.deleteQueue(DeleteQueueRequest.builder().queueUrl(deadletterUrl).build());
+                
sqsClient.deleteQueue(DeleteQueueRequest.builder().queueUrl(batchConsumerUrl).build());
+                
sqsClient.deleteQueue(DeleteQueueRequest.builder().queueUrl(selectorUrl).build());
 
                 try {
                     String url = 
sqsClient.getQueueUrl(GetQueueUrlRequest.builder().queueName(delayedQueueName).build())

Reply via email to