This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch CAMEL-20543
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 84e8e92762b48ace7e93b9e25f4fa0d9c1b3bcdf
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Fri Mar 8 14:23:38 2024 +0100

    CAMEL-20543 - Camel-AWS-Bedrock-Agent: Support more operations on the 
producer side
    
    Signed-off-by: Andrea Cosentino <anco...@gmail.com>
---
 .../aws2/bedrock/agent/BedrockAgentOperations.java |  4 +-
 .../aws2/bedrock/agent/BedrockAgentProducer.java   | 64 ++++++++++++++++++++--
 .../agent/integration/BedrockAgentProducerIT.java  | 21 ++++++-
 3 files changed, 81 insertions(+), 8 deletions(-)

diff --git 
a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java
 
b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java
index 45b2b2cbff8..57710e6951e 100644
--- 
a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java
+++ 
b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentOperations.java
@@ -18,5 +18,7 @@ package org.apache.camel.component.aws2.bedrock.agent;
 
 public enum BedrockAgentOperations {
 
-    startIngestionJob
+    startIngestionJob,
+
+    listIngestionJobs
 }
diff --git 
a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java
 
b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java
index ba479bd3d0a..de54bc34ed8 100644
--- 
a/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java
+++ 
b/components/camel-aws/camel-aws-bedrock/src/main/java/org/apache/camel/component/aws2/bedrock/agent/BedrockAgentProducer.java
@@ -27,9 +27,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.awscore.exception.AwsServiceException;
 import software.amazon.awssdk.services.bedrockagent.BedrockAgentClient;
+import 
software.amazon.awssdk.services.bedrockagent.model.ListIngestionJobsRequest;
+import 
software.amazon.awssdk.services.bedrockagent.model.ListIngestionJobsResponse;
 import 
software.amazon.awssdk.services.bedrockagent.model.StartIngestionJobRequest;
 import 
software.amazon.awssdk.services.bedrockagent.model.StartIngestionJobResponse;
-import software.amazon.awssdk.services.bedrockagentruntime.model.*;
 
 /**
  * A Producer which sends messages to the Amazon Bedrock Agent Service <a 
href="http://aws.amazon.com/bedrock/";>AWS
@@ -50,6 +51,9 @@ public class BedrockAgentProducer extends DefaultProducer {
             case startIngestionJob:
                 startIngestionJob(getEndpoint().getBedrockAgentClient(), 
exchange);
                 break;
+            case listIngestionJobs:
+                listIngestionJobs(getEndpoint().getBedrockAgentClient(), 
exchange);
+                break;
             default:
                 throw new IllegalArgumentException("Unsupported operation");
         }
@@ -95,7 +99,7 @@ public class BedrockAgentProducer extends DefaultProducer {
                     throw ase;
                 }
                 Message message = getMessageForResponse(exchange);
-                prepareResponse(result, message);
+                prepareIngestionJobResponse(result, message);
             }
         } else {
             String knowledgeBaseId;
@@ -123,14 +127,66 @@ public class BedrockAgentProducer extends DefaultProducer 
{
             builder.dataSourceId(dataSourceId);
             StartIngestionJobResponse output = 
bedrockAgentClient.startIngestionJob(builder.build());
             Message message = getMessageForResponse(exchange);
-            prepareResponse(output, message);
+            prepareIngestionJobResponse(output, message);
+        }
+    }
+
+
+    private void listIngestionJobs(BedrockAgentClient bedrockAgentClient, 
Exchange exchange)
+            throws InvalidPayloadException {
+        if (getConfiguration().isPojoRequest()) {
+            Object payload = exchange.getMessage().getMandatoryBody();
+            if (payload instanceof ListIngestionJobsRequest) {
+                ListIngestionJobsResponse result;
+                try {
+                    result = 
bedrockAgentClient.listIngestionJobs((ListIngestionJobsRequest) payload);
+                } catch (AwsServiceException ase) {
+                    LOG.trace("Start Ingestion Job command returned the error 
code {}", ase.awsErrorDetails().errorCode());
+                    throw ase;
+                }
+                Message message = getMessageForResponse(exchange);
+                prepareListIngestionJobsResponse(result, message);
+            }
+        } else {
+            String knowledgeBaseId;
+            String dataSourceId;
+            ListIngestionJobsRequest.Builder builder = 
ListIngestionJobsRequest.builder();
+            if (ObjectHelper.isEmpty(getConfiguration().getKnowledgeBaseId())) 
{
+                if 
(ObjectHelper.isNotEmpty(exchange.getMessage().getHeader(BedrockAgentConstants.KNOWLEDGE_BASE_ID)))
 {
+                    knowledgeBaseId = 
exchange.getIn().getHeader(BedrockAgentConstants.KNOWLEDGE_BASE_ID, 
String.class);
+                } else {
+                    throw new IllegalArgumentException("KnowledgeBaseId must 
be specified");
+                }
+            } else {
+                knowledgeBaseId = getConfiguration().getKnowledgeBaseId();
+            }
+            if (ObjectHelper.isEmpty(getConfiguration().getDataSourceId())) {
+                if 
(ObjectHelper.isNotEmpty(exchange.getMessage().getHeader(BedrockAgentConstants.DATASOURCE_ID)))
 {
+                    dataSourceId = 
exchange.getIn().getHeader(BedrockAgentConstants.DATASOURCE_ID, String.class);
+                } else {
+                    throw new IllegalArgumentException("DataSourceId must be 
specified");
+                }
+            } else {
+                dataSourceId = getConfiguration().getDataSourceId();
+            }
+            builder.knowledgeBaseId(knowledgeBaseId);
+            builder.dataSourceId(dataSourceId);
+            ListIngestionJobsResponse output = 
bedrockAgentClient.listIngestionJobs(builder.build());
+            Message message = getMessageForResponse(exchange);
+            prepareListIngestionJobsResponse(output, message);
         }
     }
 
-    private void prepareResponse(StartIngestionJobResponse result, Message 
message) {
+    private void prepareIngestionJobResponse(StartIngestionJobResponse result, 
Message message) {
         message.setBody(result.ingestionJob().ingestionJobId());
     }
 
+    private void prepareListIngestionJobsResponse(ListIngestionJobsResponse 
result, Message message) {
+        if (result.hasIngestionJobSummaries()) {
+            message.setBody(result.ingestionJobSummaries());
+        }
+    }
+
     public static Message getMessageForResponse(final Exchange exchange) {
         return exchange.getMessage();
     }
diff --git 
a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java
 
b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java
index cf370902ac7..7d75c128ebc 100644
--- 
a/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java
+++ 
b/components/camel-aws/camel-aws-bedrock/src/test/java/org/apache/camel/component/aws2/bedrock/agent/integration/BedrockAgentProducerIT.java
@@ -29,10 +29,10 @@ import 
org.junit.jupiter.api.condition.EnabledIfSystemProperties;
 import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
 
 // Must be manually tested. Provide your own accessKey and secretKey using 
-Daws.manual.access.key and -Daws.manual.secret.key
-@EnabledIfSystemProperties({
+/*@EnabledIfSystemProperties({
         @EnabledIfSystemProperty(named = "aws.manual.access.key", matches = 
".*", disabledReason = "Access key not provided"),
         @EnabledIfSystemProperty(named = "aws.manual.secret.key", matches = 
".*", disabledReason = "Secret key not provided")
-})
+})*/
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 class BedrockAgentProducerIT extends CamelTestSupport {
 
@@ -54,13 +54,28 @@ class BedrockAgentProducerIT extends CamelTestSupport {
         MockEndpoint.assertIsSatisfied(context);
     }
 
+    @Test
+    public void testListIngestionJobs() throws InterruptedException {
+
+        result.expectedMessageCount(1);
+        final Exchange result = template.send("direct:list_ingestion_jobs", 
exchange -> {
+            
exchange.getMessage().setHeader(BedrockAgentConstants.KNOWLEDGE_BASE_ID, 
"QOZ68KOXTS");
+            
exchange.getMessage().setHeader(BedrockAgentConstants.DATASOURCE_ID, 
"9V85PTUEAH");
+        });
+
+        MockEndpoint.assertIsSatisfied(context);
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             @Override
             public void configure() {
                 from("direct:start_ingestion")
-                        
.to("aws-bedrock-agent:label?accessKey=RAW({{aws.manual.access.key}})&secretKey=RAW({{aws.manual.secret.key}}&region=us-east-1&operation=startIngestionJob&knowledgeBaseId=QOZ68KOXTS")
+                        
.to("aws-bedrock-agent:label?useDefaultCredentialsProvider=true&region=us-east-1&operation=startIngestionJob")
+                        .to(result);
+                from("direct:list_ingestion_jobs")
+                        
.to("aws-bedrock-agent:label?useDefaultCredentialsProvider=true&region=us-east-1&operation=listIngestionJobs")
                         .log("${body}")
                         .to(result);
             }

Reply via email to