This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch kafka-batch-minor in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git
commit 821ce8f3f71e8b90c53d8377aa2e3ff9f6b8fcc4 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Thu Feb 15 14:58:20 2024 +0100 Kafka Batch Log Example: Added a little processor to show records content Signed-off-by: Andrea Cosentino <anco...@gmail.com> --- jbang/kafka-batch-log/BatchLog.java | 32 +++++++++++++++ jbang/kafka-batch-log/README.adoc | 62 +++++++++++------------------- jbang/kafka-batch-log/kafka-batch-log.yaml | 12 +++--- 3 files changed, 60 insertions(+), 46 deletions(-) diff --git a/jbang/kafka-batch-log/BatchLog.java b/jbang/kafka-batch-log/BatchLog.java new file mode 100644 index 0000000..54d89bd --- /dev/null +++ b/jbang/kafka-batch-log/BatchLog.java @@ -0,0 +1,32 @@ +package camel.example; + +import java.util.List; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.util.StringHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BatchLog implements Processor { + + private static final Logger LOG = LoggerFactory.getLogger(BatchLog.class); + + @Override + public void process(Exchange e) throws Exception { + final List<?> exchanges = e.getMessage().getBody(List.class); + + // Ensure we are actually receiving what we are asking for + if (exchanges == null || exchanges.isEmpty()) { + return; + } + + // The records from the batch are stored in a list of exchanges in the original exchange. To process, we iterate over that list + for (Object obj : exchanges) { + if (obj instanceof Exchange) { + LOG.info("Processing exchange with body {}", ((Exchange)obj).getMessage().getBody(String.class)); + } + } + } + +} diff --git a/jbang/kafka-batch-log/README.adoc b/jbang/kafka-batch-log/README.adoc index ae47d6e..31715d4 100644 --- a/jbang/kafka-batch-log/README.adoc +++ b/jbang/kafka-batch-log/README.adoc @@ -57,7 +57,7 @@ Then you can run this example using: [source,sh] ---- -$ jbang -Dcamel.jbang.version=4.4.0-SNAPSHOT camel@apache/camel run --local-kamelet-dir=<path_to_kamelets_repository> kafka-batch-log.yaml +$ jbang -Dcamel.jbang.version=4.4.0-SNAPSHOT camel@apache/camel run --local-kamelet-dir=<path_to_kamelets_repository> BatchLog.java kafka-batch-log.yaml ---- === Consumer running @@ -105,12 +105,8 @@ In the consumer log, once the pollTimeout of 40 s completes, you should see an o [source,sh] ---- -2024-02-05 09:42:07.908 INFO 21666 --- [mer[test-topic]] log-sink : Exchange[ - ExchangePattern: InOnly - Headers: {} - BodyType: java.util.ArrayList - Body: [Exchange[], Exchange[]] -] +2024-02-05 09:42:07.908 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there +2024-02-05 09:42:07.909 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there ---- If you check the situation for the consumer group 'my-group' you could see that the commit happened manually by using the kafka-batch-manual-commit-action. @@ -134,39 +130,25 @@ And you should immediately see the output in group of 10 records [source,sh] ---- -2024-02-05 09:50:33.947 INFO 24182 --- [mer[test-topic]] log-sink : Exchange[ - ExchangePattern: InOnly - Headers: {} - BodyType: java.util.ArrayList - Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]] -] -2024-02-05 09:50:44.137 INFO 24182 --- [mer[test-topic]] log-sink : Exchange[ - ExchangePattern: InOnly - Headers: {} - BodyType: java.util.ArrayList - Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]] -] -2024-02-05 09:50:54.324 INFO 24182 --- [mer[test-topic]] log-sink : Exchange[ - ExchangePattern: InOnly - Headers: {} - BodyType: java.util.ArrayList - Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]] -] -2024-02-05 09:51:04.535 INFO 24182 --- [mer[test-topic]] log-sink : Exchange[ - ExchangePattern: InOnly - Headers: {} - BodyType: java.util.ArrayList - Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]] -] -2024-02-05 09:51:14.747 INFO 24182 --- [mer[test-topic]] log-sink : Exchange[ - ExchangePattern: InOnly - Headers: {} - BodyType: java.util.ArrayList - Body: [Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[], Exchange[]] -] ----- - -For the aim of this example the payload of the records is not important. +. +. +. +. +2024-02-05 09:42:40.908 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there +2024-02-05 09:42:40.909 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there +2024-02-05 09:42:40.913 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there +2024-02-05 09:42:40.914 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there +2024-02-05 09:42:40.920 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there +2024-02-05 09:42:40.928 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there +2024-02-05 09:42:40.930 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there +2024-02-05 09:42:40.940 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there +2024-02-05 09:42:40.950 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there +2024-02-05 09:42:40.955 INFO 21666 --- [mer[test-topic]] camel.example.BatchLog : Processing exchange with body hello there +. +. +. +. +---- If you check again the offset for the consumers of my-group group you'll notice we are at offset 52 now. diff --git a/jbang/kafka-batch-log/kafka-batch-log.yaml b/jbang/kafka-batch-log/kafka-batch-log.yaml index 11c25df..1a779bd 100644 --- a/jbang/kafka-batch-log/kafka-batch-log.yaml +++ b/jbang/kafka-batch-log/kafka-batch-log.yaml @@ -17,6 +17,10 @@ # camel-k: dependency=camel:kafka +- beans: + - name: batchLog + type: "#class:camel.example.BatchLog" + - route: id: "kafka-to-log" from: @@ -31,11 +35,7 @@ autoCommitEnable: false allowManualCommit: true steps: - - to: - uri: "kamelet:log-sink" - parameters: - showStreams: true - showHeaders: true - multiline: true + - bean: + ref: batchLog - to: uri: "kamelet:kafka-batch-manual-commit-action"