This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch kafka-aws-bedrock
in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git

commit 3b15d8bc19e8fc8693d3ea66591fef8df0663de1
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Thu Mar 7 07:07:21 2024 +0100

    Added an Example of Kafka to AWS Bedrock Text Sink
    
    Signed-off-by: Andrea Cosentino <anco...@gmail.com>
---
 jbang/kafka-aws-bedrock/README.adoc                | 184 +++++++++++++++++++++
 .../kafka-aws-bedrock/kafka-aws-bedrock.camel.yaml |  35 ++++
 jbang/kafka-aws-bedrock/prompt1.txt                |   1 +
 jbang/kafka-aws-bedrock/prompt2.txt                |   1 +
 4 files changed, 221 insertions(+)

diff --git a/jbang/kafka-aws-bedrock/README.adoc 
b/jbang/kafka-aws-bedrock/README.adoc
new file mode 100644
index 0000000..670647c
--- /dev/null
+++ b/jbang/kafka-aws-bedrock/README.adoc
@@ -0,0 +1,184 @@
+== Kafka to AWS Bedrock Text Sink Kamelet
+
+In this sample you'll use the Kafka Source Kamelet and the AWS Bedrock Text 
Sink Kamelet
+
+=== Install JBang
+
+First install JBang according to https://www.jbang.dev
+
+When JBang is installed then you should be able to run from a shell:
+
+[source,sh]
+----
+$ jbang --version
+----
+
+This will output the version of JBang.
+
+To run this example you can either install Camel on JBang via:
+
+[source,sh]
+----
+$ jbang app install camel@apache/camel
+----
+
+Which allows to run CamelJBang with `camel` as shown below.
+
+=== Setup Kafka instance
+
+You'll need to run a Kafka cluster to point to. In this case you could use an 
ansible role like https://github.com/oscerd/kafka-ansible-role
+
+And set up a file deploy.yaml with the following content:
+
+```yaml
+- name: role kafka
+  hosts: localhost
+  remote_user: user
+  
+  roles:
+    - role: kafka-ansible-role
+      kafka_version: 3.6.1
+      path_dir: /home/user/
+      unarchive_dest_dir: /home/user/kafka/demo/
+      start_kafka: true
+```
+
+and then run
+
+```shell script
+ansible-playbook -v deploy.yaml
+```
+
+This should start a Kafka instance for you, on your local machine.
+
+=== Set up AWS Bedrock
+
+The Kamelet expects the AWS credentials to be placed in 
`/home/<user>/.aws/credentials`
+
+That's reason why you see the useDefaultCredentialsProvider option set to true 
in the AWS Bedrock Text Sink configuration.
+
+=== How to run
+
+Then you can run this example using:
+
+[source,sh]
+----
+$ jbang -Dcamel.jbang.version=4.5.0-SNAPSHOT camel@apache/camel run 
--local-kamelet-dir=<local_path_to_kamelets> kafka-aws-bedrock.camel.yaml
+----
+
+
+=== Sending two messages
+
+In the log you should see the consumer:
+
+[source,sh]
+----
+2024-03-07 06:50:53.517  INFO 11498 --- [           main] 
el.impl.engine.AbstractCamelContext : Routes startup (total:1 started:1 
kamelets:2)
+2024-03-07 06:50:53.517  INFO 11498 --- [           main] 
el.impl.engine.AbstractCamelContext :     Started route1 
(kamelet://kafka-not-secured-source)
+2024-03-07 06:50:53.517  INFO 11498 --- [           main] 
el.impl.engine.AbstractCamelContext : Apache Camel 4.5.0-SNAPSHOT 
(kafka-aws-bedrock) started in 817ms (build:0ms init:0ms start:817ms)
+2024-03-07 06:50:53.621  INFO 11498 --- [[bedrock-topic]] 
he.kafka.common.utils.AppInfoParser : Kafka version: 3.6.1
+2024-03-07 06:50:53.622  INFO 11498 --- [[bedrock-topic]] 
he.kafka.common.utils.AppInfoParser : Kafka commitId: 5e3c2b738d253ff5
+2024-03-07 06:50:53.622  INFO 11498 --- [[bedrock-topic]] 
he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1709790653615
+2024-03-07 06:50:53.634  INFO 11498 --- [[bedrock-topic]] 
ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy
+2024-03-07 06:50:53.634  INFO 11498 --- [[bedrock-topic]] 
l.component.kafka.KafkaFetchRecords : Subscribing bedrock-topic-Thread 0 to 
topic bedrock-topic
+2024-03-07 06:50:53.636  INFO 11498 --- [[bedrock-topic]] 
afka.clients.consumer.KafkaConsumer : [Consumer 
clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, 
groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Subscribed to topic(s): 
bedrock-topic
+2024-03-07 06:50:54.023  WARN 11498 --- [[bedrock-topic]] 
.apache.kafka.clients.NetworkClient : [Consumer 
clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, 
groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Error while fetching metadata 
with correlation id 2 : {bedrock-topic=LEADER_NOT_AVAILABLE}
+2024-03-07 06:50:54.025  INFO 11498 --- [[bedrock-topic]] 
org.apache.kafka.clients.Metadata   : [Consumer 
clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, 
groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Cluster ID: Wq0puR18So6u4dXsKJnMNg
+2024-03-07 06:50:54.140  WARN 11498 --- [[bedrock-topic]] 
.apache.kafka.clients.NetworkClient : [Consumer 
clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, 
groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Error while fetching metadata 
with correlation id 4 : {bedrock-topic=LEADER_NOT_AVAILABLE}
+2024-03-07 06:50:54.263  WARN 11498 --- [[bedrock-topic]] 
.apache.kafka.clients.NetworkClient : [Consumer 
clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, 
groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Error while fetching metadata 
with correlation id 6 : {bedrock-topic=LEADER_NOT_AVAILABLE}
+2024-03-07 06:50:54.371  WARN 11498 --- [[bedrock-topic]] 
.apache.kafka.clients.NetworkClient : [Consumer 
clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, 
groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Error while fetching metadata 
with correlation id 8 : {bedrock-topic=LEADER_NOT_AVAILABLE}
+2024-03-07 06:50:54.784  INFO 11498 --- [[bedrock-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, 
groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Discovered group coordinator 
ghost:9092 (id: 2147483647 rack: null)
+2024-03-07 06:50:54.789  INFO 11498 --- [[bedrock-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, 
groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] (Re-)joining group
+2024-03-07 06:50:54.821  INFO 11498 --- [[bedrock-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, 
groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Request joining group due to: 
need to re-join with the given member-id: 
consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1-e88d4fe5-52ef-40cf-940d-42758c538276
+2024-03-07 06:50:54.823  INFO 11498 --- [[bedrock-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, 
groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Request joining group due to: 
rebalance failed due to 'The group member needs to have a valid member id 
before actually entering a consumer group.' (MemberIdRequiredException)
+2024-03-07 06:50:54.823  INFO 11498 --- [[bedrock-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, 
groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] (Re-)joining group
+2024-03-07 06:50:54.846  INFO 11498 --- [[bedrock-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, 
groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Successfully joined group with 
generation Generation{generationId=1, 
memberId='consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1-e88d4fe5-52ef-40cf-940d-42758c538276',
 protocol='range'}
+2024-03-07 06:50:54.857  INFO 11498 --- [[bedrock-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, 
groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Finished assignment for group at 
generation 1: 
{consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1-e88d4fe5-52ef-40cf-940d-42758c538276=Assignment(partitions=[bedrock-topic-0])}
+2024-03-07 06:50:54.934  INFO 11498 --- [[bedrock-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, 
groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Successfully synced group in 
generation Generation{generationId=1, 
memberId='consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1-e88d4fe5-52ef-40cf-940d-42758c538276',
 protocol='range'}
+2024-03-07 06:50:54.935  INFO 11498 --- [[bedrock-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, 
groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Notifying assignor about the new 
Assignment(partitions=[bedrock-topic-0])
+2024-03-07 06:50:54.937  INFO 11498 --- [[bedrock-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, 
groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Adding newly assigned partitions: 
bedrock-topic-0
+2024-03-07 06:50:54.962  INFO 11498 --- [[bedrock-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer 
clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, 
groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Found no committed offset for 
partition bedrock-topic-0
+2024-03-07 06:50:54.989  INFO 11498 --- [[bedrock-topic]] 
onsumer.internals.SubscriptionState : [Consumer 
clientId=consumer-0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138-1, 
groupId=0ccdf142-fa1d-4f14-a3b5-c86c2fb5d138] Resetting offset for partition 
bedrock-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], 
epoch=0}}.
+----
+
+In the example folder you have two different messages, prompt1.txt and 
prompt2.txt, the first one will request the generation of a Json record and the 
second one an XML record. Both of them need to have an id field, a product name 
and a price.
+
+We use kcat to send the messages to kafka and consume them.
+
+[source,sh]
+----
+$ kcat -P -b localhost:9092 -t bedrock-topic prompt1.txt
+$ kcat -P -b localhost:9092 -t bedrock-topic prompt2.txt
+----
+
+If you look in the example folder you should see a results folder now:
+
+[source,sh]
+----
+$ cd results/
+$ cat *
+----
+
+You have two different files and if you cat the content you should see
+
+[source,sh]
+----
+```tabular-data-json
+{
+    "rows": [
+        {
+            "id": 1,
+            "product_name": "Phone 12",
+            "price": "$999"
+        },
+        {
+            "id": 2,
+            "product_name": "Phone 11",
+            "price": "$899"
+        },
+        {
+            "id": 3,
+            "product_name": "Phone X",
+            "price": "$799"
+        },
+        {
+            "id": 4,
+            "product_name": "Phone XS",
+            "price": "$699"
+        },
+        {
+            "id": 5,
+            "product_name": "Phone XR",
+            "price": "$599"
+        }
+    ]
+}
+```
+```tabular-data-xml
+<record id="1">
+  <product name="Apple iPhone 12 Pro Max">$1,099</product>
+</record>
+<record id="2">
+  <product name="Samsung Galaxy S21 Ultra 5G">$1,199</product>
+</record>
+<record id="3">
+  <product name="Google Pixel 6 Pro">$899</product>
+</record>
+<record id="4">
+  <product name="Apple iPhone 11 Pro">$999</product>
+</record>
+<record id="5">
+  <product name="Samsung Galaxy Note 20 Ultra 5G">$1,299</product>
+</record>
+```
+----
+
+=== Help and contributions
+
+If you hit any problem using Camel or have some feedback, then please
+https://camel.apache.org/community/support/[let us know].
+
+We also love contributors, so
+https://camel.apache.org/community/contributing/[get involved] :-)
+
+The Camel riders!
diff --git a/jbang/kafka-aws-bedrock/kafka-aws-bedrock.camel.yaml 
b/jbang/kafka-aws-bedrock/kafka-aws-bedrock.camel.yaml
new file mode 100644
index 0000000..7b0680b
--- /dev/null
+++ b/jbang/kafka-aws-bedrock/kafka-aws-bedrock.camel.yaml
@@ -0,0 +1,35 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+- route:
+    from:
+      uri: "kamelet:kafka-not-secured-source"
+      parameters:
+        topic: "bedrock-topic"
+        bootstrapServers: "localhost:9092"
+        groupId: 'my-consumer-group'
+      steps:
+      - to: 
+          uri: "kamelet:aws-bedrock-text-sink"
+          parameters:
+            region: "us-east-1"
+            useDefaultCredentialsProvider: "true"
+            modelId: "amazon.titan-text-express-v1"
+      - transform:
+          simple:
+            jq: " .results[0].outputText"
+      - to: "file:./results/"
diff --git a/jbang/kafka-aws-bedrock/prompt1.txt 
b/jbang/kafka-aws-bedrock/prompt1.txt
new file mode 100644
index 0000000..d8def8d
--- /dev/null
+++ b/jbang/kafka-aws-bedrock/prompt1.txt
@@ -0,0 +1 @@
+{"inputText":"User: Can you please generate five Json record, with id field, 
product name and price? The output should be in JSON 
format.","textGenerationConfig":{"maxTokenCount":1024,"stopSequences":["User:"],"temperature":0,"topP":1}}
diff --git a/jbang/kafka-aws-bedrock/prompt2.txt 
b/jbang/kafka-aws-bedrock/prompt2.txt
new file mode 100644
index 0000000..0503ff7
--- /dev/null
+++ b/jbang/kafka-aws-bedrock/prompt2.txt
@@ -0,0 +1 @@
+{"inputText":"User: Can you please generate five XML record, with id field, 
product name and price? The output should be in XML 
format.","textGenerationConfig":{"maxTokenCount":1024,"stopSequences":["User:"],"temperature":0,"topP":1}}

Reply via email to