This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch kafka-azure-storage-append in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git
commit d7291c16c24cb3e0e8d46833935d69c34fa850c1 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Tue May 7 11:05:20 2024 +0200 Added a Kafka Batch to Azure Storage Blob Append Example Signed-off-by: Andrea Cosentino <anco...@gmail.com> --- jbang/kafka-batch-azure-storage-blob/README.adoc | 161 +++++++++++++++++++++ .../kafka-batch-azure-storage-blob.camel.yaml | 50 +++++++ 2 files changed, 211 insertions(+) diff --git a/jbang/kafka-batch-azure-storage-blob/README.adoc b/jbang/kafka-batch-azure-storage-blob/README.adoc new file mode 100644 index 0000000..aeb4316 --- /dev/null +++ b/jbang/kafka-batch-azure-storage-blob/README.adoc @@ -0,0 +1,161 @@ +== Kafka Batch Consumer with Manual commit + +In this sample you'll use the Kafka Batch Source Kamelet in action and write the single records of the batch into an Azure Storage Blob container with append mode. + +The file name will be composed of the kafka topic name and a timestamp, so we'll collect records consumed with minutes granularity. + +=== Install JBang + +First install JBang according to https://www.jbang.dev + +When JBang is installed then you should be able to run from a shell: + +[source,sh] +---- +$ jbang --version +---- + +This will output the version of JBang. + +To run this example you can either install Camel on JBang via: + +[source,sh] +---- +$ jbang app install camel@apache/camel +---- + +Which allows to run CamelJBang with `camel` as shown below. + +=== Setup Kafka instance + +You'll need to run a Kafka cluster to point to. In this case you could use an ansible role like https://github.com/oscerd/kafka-ansible-role + +And set up a file deploy.yaml with the following content: + +```yaml +- name: role kafka + hosts: localhost + remote_user: user + + roles: + - role: kafka-ansible-role + kafka_version: 3.4.1 + path_dir: /home/user/ + unarchive_dest_dir: /home/user/kafka/demo/ + start_kafka: true +``` + +and then run + +```shell script +ansible-playbook -v deploy.yaml +``` + +This should start a Kafka instance for you, on your local machine. + +=== Set up Azure Storage Blob + +Create a container on your personal account. + +The Kamelet will use the accessKey for this purpose, so you'll need to copy it. + +Modify the kafka-batch-azure-storage-blob.yaml file to add the correct region and the correct bucket name. + +=== How to run + +Then you can run this example using: + +[source,sh] +---- +$ jbang camel@apache/camel run kafka-batch-azure-storage-blob.camel.yaml +---- + +=== Consumer running + +You should see: + +[source,sh] +---- +2024-05-07 10:24:40.206 INFO 45661 --- [ main] el.impl.engine.AbstractCamelContext : Routes startup (total:1 started:1 kamelets:3) +2024-05-07 10:24:40.207 INFO 45661 --- [ main] el.impl.engine.AbstractCamelContext : Started kafka-to-azure-storage-blob (kamelet://kafka-batch-not-secured-source) +2024-05-07 10:24:40.208 INFO 45661 --- [ main] el.impl.engine.AbstractCamelContext : Apache Camel 4.5.0 (kafka-batch-azure-storage-blob) started in 1s722ms (build:0ms init:0ms start:1s722ms) +2024-05-07 10:24:40.345 INFO 45661 --- [mer[test-topic]] try.internals.KafkaMetricsCollector : initializing Kafka metrics collector +2024-05-07 10:24:40.556 INFO 45661 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka version: 3.7.0 +2024-05-07 10:24:40.557 INFO 45661 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka commitId: 2ae524ed625438c5 +2024-05-07 10:24:40.558 INFO 45661 --- [mer[test-topic]] he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1715070280553 +2024-05-07 10:24:40.579 INFO 45661 --- [mer[test-topic]] ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy +2024-05-07 10:24:40.580 INFO 45661 --- [mer[test-topic]] l.component.kafka.KafkaFetchRecords : Subscribing test-topic-Thread 0 to topic test-topic +2024-05-07 10:24:40.588 INFO 45661 --- [mer[test-topic]] sumer.internals.LegacyKafkaConsumer : [Consumer clientId=consumer-my-group-1, groupId=my-group] Subscribed to topic(s): test-topic +2024-05-07 10:24:41.305 WARN 45661 --- [mer[test-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-my-group-1, groupId=my-group] Error while fetching metadata with correlation id 2 : {test-topic=LEADER_NOT_AVAILABLE} +2024-05-07 10:24:41.313 INFO 45661 --- [mer[test-topic]] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-my-group-1, groupId=my-group] Cluster ID: xPmbn_GEToSBXTbzNI6gsA +2024-05-07 10:24:41.548 WARN 45661 --- [mer[test-topic]] .apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-my-group-1, groupId=my-group] Error while fetching metadata with correlation id 5 : {test-topic=LEADER_NOT_AVAILABLE} +2024-05-07 10:24:43.118 INFO 45661 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Discovered group coordinator ghost:9092 (id: 2147483647 rack: null) +2024-05-07 10:24:43.127 INFO 45661 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] (Re-)joining group +2024-05-07 10:24:43.219 INFO 45661 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Request joining group due to: need to re-join with the given member-id: consumer-my-group-1-e4ba34c2-0d2e-4bdc-b437-63f0f137b8d8 +2024-05-07 10:24:43.220 INFO 45661 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] (Re-)joining group +2024-05-07 10:24:43.275 INFO 45661 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Successfully joined group with generation Generation{generationId=1, memberId='consumer-my-group-1-e4ba34c2-0d2e-4bdc-b437-63f0f137b8d8', protocol='range'} +2024-05-07 10:24:43.295 INFO 45661 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Finished assignment for group at generation 1: {consumer-my-group-1-e4ba34c2-0d2e-4bdc-b437-63f0f137b8d8=Assignment(partitions=[test-topic-0])} +2024-05-07 10:24:43.469 INFO 45661 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Successfully synced group in generation Generation{generationId=1, memberId='consumer-my-group-1-e4ba34c2-0d2e-4bdc-b437-63f0f137b8d8', protocol='range'} +2024-05-07 10:24:43.470 INFO 45661 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Notifying assignor about the new Assignment(partitions=[test-topic-0]) +2024-05-07 10:24:43.474 INFO 45661 --- [mer[test-topic]] ls.ConsumerRebalanceListenerInvoker : [Consumer clientId=consumer-my-group-1, groupId=my-group] Adding newly assigned partitions: test-topic-0 +2024-05-07 10:24:43.535 INFO 45661 --- [mer[test-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-1, groupId=my-group] Found no committed offset for partition test-topic-0 +2024-05-07 10:24:43.601 INFO 45661 --- [mer[test-topic]] onsumer.internals.SubscriptionState : [Consumer clientId=consumer-my-group-1, groupId=my-group] Resetting offset for partition test-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], epoch=0}}. +---- + +At this point we should start sending messages to the test-topic topic. We could use kcat for this. + +[source,sh] +---- +for i in {1..2}; do echo "hello there" | kcat -b localhost:9092 -P -t test-topic; done +---- + +If you check the situation for the consumer group 'my-group' you could see that the commit happened manually by using the kafka-batch-manual-commit-action. + +[source,sh] +---- +./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group + +GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID +my-group test-topic 0 2 2 0 - - - +---- + +You could also try to send groups of 10 records to see how the batch consumer behaves: + +[source,sh] +---- +for i in {1..50}; do echo "hello there" | kcat -b localhost:9092 -P -t test-topic; done +---- + +When the process complete you can check your azure storage blob container: + +[source,sh] +---- +$ az storage blob list --account-name <account_name> --container <container_name> --query '[].name' +[ + "test-topic-202405071026.txt", + "test-topic-202405071027.txt", + "test-topic-202405071029.txt" +] +---- + +If you check again the offset for the consumers of my-group group you'll notice we are at offset 52 now. + +If you download and open the blobs in the container you'll notice the hello there string repeated multiple times. + +[source,sh] +---- +./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group + +GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID +my-group test-topic 0 52 52 0 - - - +---- + +=== Help and contributions + +If you hit any problem using Camel or have some feedback, then please +https://camel.apache.org/community/support/[let us know]. + +We also love contributors, so +https://camel.apache.org/community/contributing/[get involved] :-) + +The Camel riders! diff --git a/jbang/kafka-batch-azure-storage-blob/kafka-batch-azure-storage-blob.camel.yaml b/jbang/kafka-batch-azure-storage-blob/kafka-batch-azure-storage-blob.camel.yaml new file mode 100644 index 0000000..5ec4cb8 --- /dev/null +++ b/jbang/kafka-batch-azure-storage-blob/kafka-batch-azure-storage-blob.camel.yaml @@ -0,0 +1,50 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +# camel-k: dependency=camel:kafka + +- route: + id: "kafka-to-azure-storage-blob" + from: + uri: "kamelet:kafka-batch-not-secured-source" + parameters: + bootstrapServers: "localhost:9092" + topic: "test-topic" + consumerGroup: 'my-group' + batchSize: 10 + pollTimeout: 40000 + maxPollIntervalMs: 60000 + autoCommitEnable: false + allowManualCommit: true + deserializeHeaders: true + steps: + - split: + simple: "${body}" + steps: + - setHeader: + name: "file" + simple: "${body.getMessage().getHeader('kafka.TOPIC')}-${date:now:yyyyMMddHHmm}.txt" + - setBody: + simple: "${body.getMessage().getBody()}" + - to: + uri: "kamelet:azure-storage-blob-append-sink" + parameters: + accountName: "test1234567891012" + containerName: "kafkabatch" + accessKey: "accessKey" + - to: + uri: "kamelet:kafka-batch-manual-commit-action"