This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch kafka-azure-storage-append
in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git

commit d7291c16c24cb3e0e8d46833935d69c34fa850c1
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Tue May 7 11:05:20 2024 +0200

    Added a Kafka Batch to Azure Storage Blob Append Example
    
    Signed-off-by: Andrea Cosentino <anco...@gmail.com>
---
 jbang/kafka-batch-azure-storage-blob/README.adoc   | 161 +++++++++++++++++++++
 .../kafka-batch-azure-storage-blob.camel.yaml      |  50 +++++++
 2 files changed, 211 insertions(+)

diff --git a/jbang/kafka-batch-azure-storage-blob/README.adoc 
b/jbang/kafka-batch-azure-storage-blob/README.adoc
new file mode 100644
index 0000000..aeb4316
--- /dev/null
+++ b/jbang/kafka-batch-azure-storage-blob/README.adoc
@@ -0,0 +1,161 @@
+== Kafka Batch Consumer with Manual commit
+
+In this sample you'll use the Kafka Batch Source Kamelet in action and write 
the single records of the batch into an Azure Storage Blob container with 
append mode.
+
+The file name will be composed of the kafka topic name and a timestamp, so 
we'll collect records consumed with minutes granularity.
+
+=== Install JBang
+
+First install JBang according to https://www.jbang.dev
+
+When JBang is installed then you should be able to run from a shell:
+
+[source,sh]
+----
+$ jbang --version
+----
+
+This will output the version of JBang.
+
+To run this example you can either install Camel on JBang via:
+
+[source,sh]
+----
+$ jbang app install camel@apache/camel
+----
+
+Which allows to run CamelJBang with `camel` as shown below.
+
+=== Setup Kafka instance
+
+You'll need to run a Kafka cluster to point to. In this case you could use an 
ansible role like https://github.com/oscerd/kafka-ansible-role
+
+And set up a file deploy.yaml with the following content:
+
+```yaml
+- name: role kafka
+  hosts: localhost
+  remote_user: user
+  
+  roles:
+    - role: kafka-ansible-role
+      kafka_version: 3.4.1
+      path_dir: /home/user/
+      unarchive_dest_dir: /home/user/kafka/demo/
+      start_kafka: true
+```
+
+and then run
+
+```shell script
+ansible-playbook -v deploy.yaml
+```
+
+This should start a Kafka instance for you, on your local machine.
+
+=== Set up Azure Storage Blob
+
+Create a container on your personal account.
+
+The Kamelet will use the accessKey for this purpose, so you'll need to copy it.
+
+Modify the kafka-batch-azure-storage-blob.yaml file to add the correct region 
and the correct bucket name.
+
+=== How to run
+
+Then you can run this example using:
+
+[source,sh]
+----
+$ jbang camel@apache/camel run kafka-batch-azure-storage-blob.camel.yaml
+----
+
+=== Consumer running
+
+You should see:
+
+[source,sh]
+----
+2024-05-07 10:24:40.206  INFO 45661 --- [           main] 
el.impl.engine.AbstractCamelContext : Routes startup (total:1 started:1 
kamelets:3)
+2024-05-07 10:24:40.207  INFO 45661 --- [           main] 
el.impl.engine.AbstractCamelContext :     Started kafka-to-azure-storage-blob 
(kamelet://kafka-batch-not-secured-source)
+2024-05-07 10:24:40.208  INFO 45661 --- [           main] 
el.impl.engine.AbstractCamelContext : Apache Camel 4.5.0 
(kafka-batch-azure-storage-blob) started in 1s722ms (build:0ms init:0ms 
start:1s722ms)
+2024-05-07 10:24:40.345  INFO 45661 --- [mer[test-topic]] 
try.internals.KafkaMetricsCollector : initializing Kafka metrics collector
+2024-05-07 10:24:40.556  INFO 45661 --- [mer[test-topic]] 
he.kafka.common.utils.AppInfoParser : Kafka version: 3.7.0
+2024-05-07 10:24:40.557  INFO 45661 --- [mer[test-topic]] 
he.kafka.common.utils.AppInfoParser : Kafka commitId: 2ae524ed625438c5
+2024-05-07 10:24:40.558  INFO 45661 --- [mer[test-topic]] 
he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1715070280553
+2024-05-07 10:24:40.579  INFO 45661 --- [mer[test-topic]] 
ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy
+2024-05-07 10:24:40.580  INFO 45661 --- [mer[test-topic]] 
l.component.kafka.KafkaFetchRecords : Subscribing test-topic-Thread 0 to topic 
test-topic
+2024-05-07 10:24:40.588  INFO 45661 --- [mer[test-topic]] 
sumer.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Subscribed to topic(s): test-topic
+2024-05-07 10:24:41.305  WARN 45661 --- [mer[test-topic]] 
.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Error while fetching metadata with correlation id 2 : 
{test-topic=LEADER_NOT_AVAILABLE}
+2024-05-07 10:24:41.313  INFO 45661 --- [mer[test-topic]] 
org.apache.kafka.clients.Metadata   : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Cluster ID: xPmbn_GEToSBXTbzNI6gsA
+2024-05-07 10:24:41.548  WARN 45661 --- [mer[test-topic]] 
.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Error while fetching metadata with correlation id 5 : 
{test-topic=LEADER_NOT_AVAILABLE}
+2024-05-07 10:24:43.118  INFO 45661 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Discovered group coordinator ghost:9092 (id: 2147483647 rack: 
null)
+2024-05-07 10:24:43.127  INFO 45661 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] (Re-)joining group
+2024-05-07 10:24:43.219  INFO 45661 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Request joining group due to: need to re-join with the given 
member-id: consumer-my-group-1-e4ba34c2-0d2e-4bdc-b437-63f0f137b8d8
+2024-05-07 10:24:43.220  INFO 45661 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] (Re-)joining group
+2024-05-07 10:24:43.275  INFO 45661 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Successfully joined group with generation 
Generation{generationId=1, 
memberId='consumer-my-group-1-e4ba34c2-0d2e-4bdc-b437-63f0f137b8d8', 
protocol='range'}
+2024-05-07 10:24:43.295  INFO 45661 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Finished assignment for group at generation 1: 
{consumer-my-group-1-e4ba34c2-0d2e-4bdc-b437-63f0f137b8d8=Assignment(partitions=[test-topic-0])}
+2024-05-07 10:24:43.469  INFO 45661 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Successfully synced group in generation 
Generation{generationId=1, 
memberId='consumer-my-group-1-e4ba34c2-0d2e-4bdc-b437-63f0f137b8d8', 
protocol='range'}
+2024-05-07 10:24:43.470  INFO 45661 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Notifying assignor about the new 
Assignment(partitions=[test-topic-0])
+2024-05-07 10:24:43.474  INFO 45661 --- [mer[test-topic]] 
ls.ConsumerRebalanceListenerInvoker : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Adding newly assigned partitions: test-topic-0
+2024-05-07 10:24:43.535  INFO 45661 --- [mer[test-topic]] 
sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Found no committed offset for partition test-topic-0
+2024-05-07 10:24:43.601  INFO 45661 --- [mer[test-topic]] 
onsumer.internals.SubscriptionState : [Consumer clientId=consumer-my-group-1, 
groupId=my-group] Resetting offset for partition test-topic-0 to position 
FetchPosition{offset=0, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], 
epoch=0}}.
+----
+
+At this point we should start sending messages to the test-topic topic. We 
could use kcat for this.
+
+[source,sh]
+----
+for i in {1..2}; do echo "hello there" | kcat -b localhost:9092 -P -t 
test-topic; done
+----
+
+If you check the situation for the consumer group 'my-group' you could see 
that the commit happened manually by using the kafka-batch-manual-commit-action.
+
+[source,sh]
+----
+./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe 
--group my-group
+
+GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG 
            CONSUMER-ID     HOST            CLIENT-ID
+my-group        test-topic      0          2               2               0   
            -               -               -
+----
+
+You could also try to send groups of 10 records to see how the batch consumer 
behaves:
+
+[source,sh]
+----
+for i in {1..50}; do echo "hello there" | kcat -b localhost:9092 -P -t 
test-topic; done
+----
+
+When the process complete you can check your azure storage blob container:
+
+[source,sh]
+----
+$ az storage blob list --account-name <account_name> --container 
<container_name> --query '[].name'
+[
+  "test-topic-202405071026.txt",
+  "test-topic-202405071027.txt",
+  "test-topic-202405071029.txt"
+]
+----
+
+If you check again the offset for the consumers of my-group group you'll 
notice we are at offset 52 now.
+
+If you download and open the blobs in the container you'll notice the hello 
there string repeated multiple times.
+
+[source,sh]
+----
+./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe 
--group my-group
+
+GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG 
            CONSUMER-ID                                              HOST       
     CLIENT-ID
+my-group        test-topic      0          52              52              0   
            -                                                        -          
     -
+----
+
+=== Help and contributions
+
+If you hit any problem using Camel or have some feedback, then please
+https://camel.apache.org/community/support/[let us know].
+
+We also love contributors, so
+https://camel.apache.org/community/contributing/[get involved] :-)
+
+The Camel riders!
diff --git 
a/jbang/kafka-batch-azure-storage-blob/kafka-batch-azure-storage-blob.camel.yaml
 
b/jbang/kafka-batch-azure-storage-blob/kafka-batch-azure-storage-blob.camel.yaml
new file mode 100644
index 0000000..5ec4cb8
--- /dev/null
+++ 
b/jbang/kafka-batch-azure-storage-blob/kafka-batch-azure-storage-blob.camel.yaml
@@ -0,0 +1,50 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+# camel-k: dependency=camel:kafka
+
+- route:
+    id: "kafka-to-azure-storage-blob"
+    from:
+      uri: "kamelet:kafka-batch-not-secured-source"
+      parameters:
+        bootstrapServers: "localhost:9092"
+        topic: "test-topic"
+        consumerGroup: 'my-group'
+        batchSize: 10
+        pollTimeout: 40000
+        maxPollIntervalMs: 60000
+        autoCommitEnable: false
+        allowManualCommit: true
+        deserializeHeaders: true
+      steps:
+        - split:
+            simple: "${body}"
+            steps:
+              - setHeader:
+                  name: "file"
+                  simple: 
"${body.getMessage().getHeader('kafka.TOPIC')}-${date:now:yyyyMMddHHmm}.txt"
+              - setBody:
+                  simple: "${body.getMessage().getBody()}"
+              - to:
+                  uri: "kamelet:azure-storage-blob-append-sink"
+                  parameters:
+                    accountName: "test1234567891012"
+                    containerName: "kafkabatch"
+                    accessKey: "accessKey"            
+        - to:
+            uri: "kamelet:kafka-batch-manual-commit-action"

Reply via email to