This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch CAMEL-20543 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 84e8e92762b48ace7e93b9e25f4fa0d9c1b3bcdf Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Fri Mar 8 14:23:38 2024 +0100 CAMEL-20543 - Camel-AWS-Bedrock-Agent: Support more operations on the producer side Signed-off-by: Andrea Cosentino <anco...@gmail.com> --- .../aws2/bedrock/agent/BedrockAgentOperations.java | 4 +- .../aws2/bedrock/agent/BedrockAgentProducer.java | 64 ++++++++++++++++++++-- .../agent/integration/BedrockAgentProducerIT.java | 21 ++++++- 3 files changed, 81 insertions(+), 8 deletions(-) diff --git a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java index 45b2b2cbff8..57710e6951e 100644 --- a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java +++ b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java @@ -18,5 +18,7 @@ package org.apache.camel.component.aws2.bedrock.agent; public enum BedrockAgentOperations { - startIngestionJob + startIngestionJob, + + listIngestionJobs } diff --git a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java index ba479bd3d0a..de54bc34ed8 100644 --- a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java +++ b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java @@ -27,9 +27,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.services.bedrockagent.BedrockAgentClient; +import software.amazon.awssdk.services.bedrockagent.model.ListIngestionJobsRequest; +import software.amazon.awssdk.services.bedrockagent.model.ListIngestionJobsResponse; import software.amazon.awssdk.services.bedrockagent.model.StartIngestionJobRequest; import software.amazon.awssdk.services.bedrockagent.model.StartIngestionJobResponse; -import software.amazon.awssdk.services.bedrockagentruntime.model.*; /** * A Producer which sends messages to the Amazon Bedrock Agent Service <a href="http://aws.amazon.com/bedrock/">AWS @@ -50,6 +51,9 @@ public class BedrockAgentProducer extends DefaultProducer { case startIngestionJob: startIngestionJob(getEndpoint().getBedrockAgentClient(), exchange); break; + case listIngestionJobs: + listIngestionJobs(getEndpoint().getBedrockAgentClient(), exchange); + break; default: throw new IllegalArgumentException("Unsupported operation"); } @@ -95,7 +99,7 @@ public class BedrockAgentProducer extends DefaultProducer { throw ase; } Message message = getMessageForResponse(exchange); - prepareResponse(result, message); + prepareIngestionJobResponse(result, message); } } else { String knowledgeBaseId; @@ -123,14 +127,66 @@ public class BedrockAgentProducer extends DefaultProducer { builder.dataSourceId(dataSourceId); StartIngestionJobResponse output = bedrockAgentClient.startIngestionJob(builder.build()); Message message = getMessageForResponse(exchange); - prepareResponse(output, message); + prepareIngestionJobResponse(output, message); + } + } + + + private void listIngestionJobs(BedrockAgentClient bedrockAgentClient, Exchange exchange) + throws InvalidPayloadException { + if (getConfiguration().isPojoRequest()) { + Object payload = exchange.getMessage().getMandatoryBody(); + if (payload instanceof ListIngestionJobsRequest) { + ListIngestionJobsResponse result; + try { + result = bedrockAgentClient.listIngestionJobs((ListIngestionJobsRequest) payload); + } catch (AwsServiceException ase) { + LOG.trace("Start Ingestion Job command returned the error code {}", ase.awsErrorDetails().errorCode()); + throw ase; + } + Message message = getMessageForResponse(exchange); + prepareListIngestionJobsResponse(result, message); + } + } else { + String knowledgeBaseId; + String dataSourceId; + ListIngestionJobsRequest.Builder builder = ListIngestionJobsRequest.builder(); + if (ObjectHelper.isEmpty(getConfiguration().getKnowledgeBaseId())) { + if (ObjectHelper.isNotEmpty(exchange.getMessage().getHeader(BedrockAgentConstants.KNOWLEDGE_BASE_ID))) { + knowledgeBaseId = exchange.getIn().getHeader(BedrockAgentConstants.KNOWLEDGE_BASE_ID, String.class); + } else { + throw new IllegalArgumentException("KnowledgeBaseId must be specified"); + } + } else { + knowledgeBaseId = getConfiguration().getKnowledgeBaseId(); + } + if (ObjectHelper.isEmpty(getConfiguration().getDataSourceId())) { + if (ObjectHelper.isNotEmpty(exchange.getMessage().getHeader(BedrockAgentConstants.DATASOURCE_ID))) { + dataSourceId = exchange.getIn().getHeader(BedrockAgentConstants.DATASOURCE_ID, String.class); + } else { + throw new IllegalArgumentException("DataSourceId must be specified"); + } + } else { + dataSourceId = getConfiguration().getDataSourceId(); + } + builder.knowledgeBaseId(knowledgeBaseId); + builder.dataSourceId(dataSourceId); + ListIngestionJobsResponse output = bedrockAgentClient.listIngestionJobs(builder.build()); + Message message = getMessageForResponse(exchange); + prepareListIngestionJobsResponse(output, message); } } - private void prepareResponse(StartIngestionJobResponse result, Message message) { + private void prepareIngestionJobResponse(StartIngestionJobResponse result, Message message) { message.setBody(result.ingestionJob().ingestionJobId()); } + private void prepareListIngestionJobsResponse(ListIngestionJobsResponse result, Message message) { + if (result.hasIngestionJobSummaries()) { + message.setBody(result.ingestionJobSummaries()); + } + } + public static Message getMessageForResponse(final Exchange exchange) { return exchange.getMessage(); } diff --git a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java index cf370902ac7..7d75c128ebc 100644 --- a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java +++ b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java @@ -29,10 +29,10 @@ import org.junit.jupiter.api.condition.EnabledIfSystemProperties; import org.junit.jupiter.api.condition.EnabledIfSystemProperty; // Must be manually tested. Provide your own accessKey and secretKey using -Daws.manual.access.key and -Daws.manual.secret.key -@EnabledIfSystemProperties({ +/*@EnabledIfSystemProperties({ @EnabledIfSystemProperty(named = "aws.manual.access.key", matches = ".*", disabledReason = "Access key not provided"), @EnabledIfSystemProperty(named = "aws.manual.secret.key", matches = ".*", disabledReason = "Secret key not provided") -}) +})*/ @TestInstance(TestInstance.Lifecycle.PER_CLASS) class BedrockAgentProducerIT extends CamelTestSupport { @@ -54,13 +54,28 @@ class BedrockAgentProducerIT extends CamelTestSupport { MockEndpoint.assertIsSatisfied(context); } + @Test + public void testListIngestionJobs() throws InterruptedException { + + result.expectedMessageCount(1); + final Exchange result = template.send("direct:list_ingestion_jobs", exchange -> { + exchange.getMessage().setHeader(BedrockAgentConstants.KNOWLEDGE_BASE_ID, "QOZ68KOXTS"); + exchange.getMessage().setHeader(BedrockAgentConstants.DATASOURCE_ID, "9V85PTUEAH"); + }); + + MockEndpoint.assertIsSatisfied(context); + } + @Override protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { @Override public void configure() { from("direct:start_ingestion") - .to("aws-bedrock-agent:label?accessKey=RAW({{aws.manual.access.key}})&secretKey=RAW({{aws.manual.secret.key}}®ion=us-east-1&operation=startIngestionJob&knowledgeBaseId=QOZ68KOXTS") + .to("aws-bedrock-agent:label?useDefaultCredentialsProvider=true®ion=us-east-1&operation=startIngestionJob") + .to(result); + from("direct:list_ingestion_jobs") + .to("aws-bedrock-agent:label?useDefaultCredentialsProvider=true®ion=us-east-1&operation=listIngestionJobs") .log("${body}") .to(result); }