This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-kamelets-examples.git
The following commit(s) were added to refs/heads/main by this push: new cb6cf4f Added an example of Kafka Apicurio Kamelet with Keycloak cb6cf4f is described below commit cb6cf4fbe7f353af7c3564caaff0703c9cadf288 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Fri Feb 23 11:58:22 2024 +0100 Added an example of Kafka Apicurio Kamelet with Keycloak Signed-off-by: Andrea Cosentino <anco...@gmail.com> --- .../README.md | 102 ++++ .../application.properties | 9 + .../kafka-apicurio-kamelet.yaml | 45 ++ .../kafka-apicurio-producer-kamelet.yaml | 53 ++ .../kafka-producer/pom.xml | 84 +++ .../kafka-producer/src/main/avro/order.avsc | 33 + .../main/java/com/acme/example/kafka/Produce.java | 85 +++ .../java/com/acme/example/kafka/models/Order.java | 673 +++++++++++++++++++++ 8 files changed, 1084 insertions(+) diff --git a/jbang/kafka-apicurio-secured-schema-registry/README.md b/jbang/kafka-apicurio-secured-schema-registry/README.md new file mode 100644 index 0000000..da2a6ea --- /dev/null +++ b/jbang/kafka-apicurio-secured-schema-registry/README.md @@ -0,0 +1,102 @@ +# Example for consuming from Kafka with the usage of Apicurio Schema Registry secured with Keycloak and Avro + +You'll need a running Kafka instance and an Apicurio Registry + +## Kafka instance + +You could use a plain Kafka archive or use an Ansible role + +## Apicurio Registry settings + +Follow the guide here to have a fully running Apicurio Registry with Keycloak: https://github.com/Apicurio/apicurio-registry/tree/main/distro/docker-compose#docker-compose-and-quarkus-based-installation + +Once everything is up and running, access the Keycloak instance located at `http://YOUR_IP:8080` + +The username and password is admin/admin. + +Select the realm `registry` and create a new user called `registry-account` with password `registry`. + +The client Id from Keycloak will be `registry-api`. + +In the client page, select credentials and copy the client secret value. + +In `application.properties` file copy the client secret value into `keycloak.client.secret`. + +The rest of the options could be used as-is. + +## Configure the applications + +In `application.properties` set the Kafka instance address. + +## Produce to Kafka. + +Run [`Produce.java`](./kafka-producer/src/main/java/com/acme/example/kafka/Produce.java) to produce a message to the Kafka. + +```bash +mvn compile exec:java -Dexec.mainClass="com.acme.example.kafka.Produce" +``` + +## Produce to Kafka without Kamelets + +To consume messages using a Camel route, first install the kafka-producer maven project: +```bash +cd kafka-producer +mvn clean install +``` +then run: +```bash +jbang run camel@apache/camel run --local-kamelet-dir=<local_path_to_camel_kamelets> kafka-apicurio-kamelet.yaml +``` + +You should see something like + +```bash +2024-02-23 11:53:11.840 INFO 39989 --- [ main] el.impl.engine.AbstractCamelContext : Routes startup (started:3) +2024-02-23 11:53:11.841 INFO 39989 --- [ main] el.impl.engine.AbstractCamelContext : Started kafka-to-apicurio-log (kamelet://kafka-not-secured-apicurio-registry-source) +2024-02-23 11:53:11.841 INFO 39989 --- [ main] el.impl.engine.AbstractCamelContext : Started kafka-not-secured-apicurio-registry-source-1 (kafka://my-topic) +2024-02-23 11:53:11.841 INFO 39989 --- [ main] el.impl.engine.AbstractCamelContext : Started log-sink-2 (kamelet://source) +2024-02-23 11:53:11.841 INFO 39989 --- [ main] el.impl.engine.AbstractCamelContext : Apache Camel 4.4.0 (kafka-apicurio-kamelet) started in 216ms (build:0ms init:0ms start:216ms) +2024-02-23 11:53:12.083 INFO 39989 --- [sumer[my-topic]] fka.clients.consumer.ConsumerConfig : These configurations '[apicurio.registry.avroDatumProvider, apicurio.auth.service.url, apicurio.auth.realm, apicurio.auth.password, apicurio.auth.client.id, apicurio.auth.client.secret, apicurio.registry.url, apicurio.auth.username]' were supplied but are not used yet. +2024-02-23 11:53:12.085 INFO 39989 --- [sumer[my-topic]] he.kafka.common.utils.AppInfoParser : Kafka version: 3.5.1 +2024-02-23 11:53:12.085 INFO 39989 --- [sumer[my-topic]] he.kafka.common.utils.AppInfoParser : Kafka commitId: 2c6fb6c54472e90a +2024-02-23 11:53:12.085 INFO 39989 --- [sumer[my-topic]] he.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1708685592083 +2024-02-23 11:53:12.090 INFO 39989 --- [sumer[my-topic]] ort.classic.AssignmentAdapterHelper : Using NO-OP resume strategy +2024-02-23 11:53:12.090 INFO 39989 --- [sumer[my-topic]] l.component.kafka.KafkaFetchRecords : Subscribing my-topic-Thread 0 to topic my-topic +2024-02-23 11:53:12.091 INFO 39989 --- [sumer[my-topic]] afka.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160-1, groupId=2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160] Subscribed to topic(s): my-topic +2024-02-23 11:53:12.328 INFO 39989 --- [sumer[my-topic]] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160-1, groupId=2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160] Cluster ID: tM9KiIzXSHOsmkLYHKEB_g +2024-02-23 11:53:12.329 INFO 39989 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160-1, groupId=2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160] Discovered group coordinator ghost:9092 (id: 2147483647 rack: null) +2024-02-23 11:53:12.333 INFO 39989 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160-1, groupId=2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160] (Re-)joining group +2024-02-23 11:53:12.343 INFO 39989 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160-1, groupId=2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160] Request joining group due to: need to re-join with the given member-id: consumer-2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160-1-08829d9f-0409-424b-8a8b-3dfd94379fba +2024-02-23 11:53:12.343 INFO 39989 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160-1, groupId=2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException) +2024-02-23 11:53:12.343 INFO 39989 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160-1, groupId=2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160] (Re-)joining group +2024-02-23 11:53:12.346 INFO 39989 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160-1, groupId=2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160] Successfully joined group with generation Generation{generationId=1, memberId='consumer-2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160-1-08829d9f-0409-424b-8a8b-3dfd94379fba', protocol='range'} +2024-02-23 11:53:12.357 INFO 39989 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160-1, groupId=2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160] Finished assignment for group at generation 1: {consumer-2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160-1-08829d9f-0409-424b-8a8b-3dfd94379fba=Assignment(partitions=[my-topic-0])} +2024-02-23 11:53:12.369 INFO 39989 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160-1, groupId=2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160] Successfully synced group in generation Generation{generationId=1, memberId='consumer-2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160-1-08829d9f-0409-424b-8a8b-3dfd94379fba', protocol='range'} +2024-02-23 11:53:12.370 INFO 39989 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160-1, groupId=2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160] Notifying assignor about the new Assignment(partitions=[my-topic-0]) +2024-02-23 11:53:12.373 INFO 39989 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160-1, groupId=2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160] Adding newly assigned partitions: my-topic-0 +2024-02-23 11:53:12.379 INFO 39989 --- [sumer[my-topic]] sumer.internals.ConsumerCoordinator : [Consumer clientId=consumer-2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160-1, groupId=2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160] Found no committed offset for partition my-topic-0 +2024-02-23 11:53:12.388 INFO 39989 --- [sumer[my-topic]] onsumer.internals.SubscriptionState : [Consumer clientId=consumer-2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160-1, groupId=2b2daf0d-5ce6-4fe3-8f3d-253d7cd92160] Resetting offset for partition my-topic-0 to position FetchPosition{offset=21, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[ghost:9092 (id: 0 rack: null)], epoch=0}}. + +``` + +and after a message has been produced to Kafka you should see + +```bash +2024-02-23 11:53:27.247 INFO 39989 --- [sumer[my-topic]] log-sink : Exchange[ + ExchangePattern: InOnly + Headers: {apicurio.value.encoding=BINARY, apicurio.value.globalId=, CamelMessageTimestamp=1708685606746, kafka.HEADERS=RecordHeaders(headers = [RecordHeader(key = apicurio.value.globalId, value = [0, 0, 0, 0, 0, 0, 0, 2]), RecordHeader(key = apicurio.value.encoding, value = [66, 73, 78, 65, 82, 89])], isReadOnly = false), kafka.KEY=key, kafka.OFFSET=21, kafka.PARTITION=0, kafka.TIMESTAMP=1708685606746, kafka.TOPIC=my-topic} + BodyType: org.apache.avro.generic.GenericData.Record + Body: {"orderId": 1, "itemId": "item", "userId": "user", "quantity": 3.0, "description": "A really nice item I do love"} +``` + +## Produce to Kafka with Kamelets + +You might also want to try out the specialized Kamelet sink for Apicurio Registry + +Follow the same approach but run + +then run: +```bash +jbang run camel@apache/camel run --local-kamelet-dir=<local_path_to_camel_kamelets> kafka-apicurio-producer-kamelet.yaml +``` + diff --git a/jbang/kafka-apicurio-secured-schema-registry/application.properties b/jbang/kafka-apicurio-secured-schema-registry/application.properties new file mode 100644 index 0000000..5fe655a --- /dev/null +++ b/jbang/kafka-apicurio-secured-schema-registry/application.properties @@ -0,0 +1,9 @@ +bootstrap.servers=localhost:9092 +topic=my-topic +schema.registry.url=http://localhost:8081/apis/registry/v2 +keycloak.service.url=http://localhost:8080/ +keycloak.realm=registry +keycloak.client.id=registry-api +keycloak.client.secret=<client_secret> +keycloak.apicurio.username=registry-account +keycloak.apicurio.password=registry diff --git a/jbang/kafka-apicurio-secured-schema-registry/kafka-apicurio-kamelet.yaml b/jbang/kafka-apicurio-secured-schema-registry/kafka-apicurio-kamelet.yaml new file mode 100644 index 0000000..2a7f5bc --- /dev/null +++ b/jbang/kafka-apicurio-secured-schema-registry/kafka-apicurio-kamelet.yaml @@ -0,0 +1,45 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +# camel-k: dependency=mvn:com.acme.example:kafka-apicurio-producer:0.1 + +- beans: + - name: order + type: "#class:com.acme.example.kafka.models.Order" + +- route: + id: "kafka-to-apicurio-log" + from: + uri: "kamelet:kafka-not-secured-apicurio-registry-source" + parameters: + topic: "{{topic}}" + bootstrapServers: "{{bootstrap.servers}}" + groupId: 'my-consumer-group' + apicurioRegistryUrl: '{{schema.registry.url}}' + apicurioAuthServiceUrl: '{{keycloak.service.url}}' + apicurioAuthRealm: '{{keycloak.realm}}' + apicurioAuthClientId: '{{keycloak.client.id}}' + apicurioAuthClientSecret: '{{keycloak.client.secret}}' + apicurioAuthUsername: '{{keycloak.apicurio.username}}' + apicurioAuthPassword: '{{keycloak.apicurio.password}}' + steps: + - to: + uri: "kamelet:log-sink" + parameters: + showStreams: true + showHeaders: true + multiline: true diff --git a/jbang/kafka-apicurio-secured-schema-registry/kafka-apicurio-producer-kamelet.yaml b/jbang/kafka-apicurio-secured-schema-registry/kafka-apicurio-producer-kamelet.yaml new file mode 100644 index 0000000..ae1ba1a --- /dev/null +++ b/jbang/kafka-apicurio-secured-schema-registry/kafka-apicurio-producer-kamelet.yaml @@ -0,0 +1,53 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +# camel-k: dependency=mvn:com.acme.example:kafka-apicurio-producer:0.1 + +- beans: + - name: order + type: "#class:com.acme.example.kafka.models.Order" + properties: + orderId: 1 + itemId: "123" + userId: "oscerd" + quantity: 3 + description: "My item" + +- route: + from: + uri: "kamelet:timer-source" + parameters: + message: '{"id":"1","message":"Camel Rocks"}' + contentType: "application/json" + repeatCount: 10 + steps: + - set-body: + simple: "${ref:order}" + - to: + uri: "kamelet:kafka-not-secured-apicurio-registry-sink" + parameters: + topic: "{{topic}}" + bootstrapServers: "{{bootstrap.servers}}" + groupId: 'my-consumer-group' + apicurioRegistryUrl: '{{schema.registry.url}}' + apicurioRegistryUrl: '{{schema.registry.url}}' + apicurioAuthServiceUrl: '{{keycloak.service.url}}' + apicurioAuthRealm: '{{keycloak.realm}}' + apicurioAuthClientId: '{{keycloak.client.id}}' + apicurioAuthClientSecret: '{{keycloak.client.secret}}' + apicurioAuthUsername: '{{keycloak.apicurio.username}}' + apicurioAuthPassword: '{{keycloak.apicurio.password}}' diff --git a/jbang/kafka-apicurio-secured-schema-registry/kafka-producer/pom.xml b/jbang/kafka-apicurio-secured-schema-registry/kafka-producer/pom.xml new file mode 100644 index 0000000..378c362 --- /dev/null +++ b/jbang/kafka-apicurio-secured-schema-registry/kafka-producer/pom.xml @@ -0,0 +1,84 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>com.acme.example</groupId> + <artifactId>kafka-apicurio-producer</artifactId> + <packaging>jar</packaging> + <version>0.1</version> + <name>kafka-apicurio-prod</name> + <url>http://maven.apache.org</url> + <dependencies> + + <dependency> + <groupId>io.apicurio</groupId> + <artifactId>apicurio-registry-serdes-avro-serde</artifactId> + <version>2.4.12.Final</version> + <exclusions> + <exclusion> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>3.5.1</version> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>1.11.3</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>2.0.7</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <version>2.0.7</version> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>3.8.1</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.11.0</version> + <configuration> + <source>17</source> + <target>17</target> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.avro</groupId> + <artifactId>avro-maven-plugin</artifactId> + <version>1.11.3</version> + <executions> + <execution> + <phase>generate-sources</phase> + <goals> + <goal>schema</goal> + </goals> + <configuration> + <sourceDirectory>src/main/avro</sourceDirectory> + <outputDirectory>${project.basedir}/src/main/java</outputDirectory> + <stringType>String</stringType> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/jbang/kafka-apicurio-secured-schema-registry/kafka-producer/src/main/avro/order.avsc b/jbang/kafka-apicurio-secured-schema-registry/kafka-producer/src/main/avro/order.avsc new file mode 100644 index 0000000..e41753e --- /dev/null +++ b/jbang/kafka-apicurio-secured-schema-registry/kafka-producer/src/main/avro/order.avsc @@ -0,0 +1,33 @@ +{ + "doc": "Fact schema of an order", + "fields": [ + { + "doc": "Unique id of the order.", + "name": "orderId", + "type": "int" + }, + { + "doc": "Id of the ordered item.", + "name": "itemId", + "type": "string" + }, + { + "doc": "Id of the user who ordered the item.", + "name": "userId", + "type": "string" + }, + { + "doc": "How much was ordered.", + "name": "quantity", + "type": "double" + }, + { + "doc": "Description of item.", + "name": "description", + "type": "string" + } + ], + "name": "Order", + "namespace": "com.acme.example.kafka.models", + "type": "record" +} diff --git a/jbang/kafka-apicurio-secured-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/Produce.java b/jbang/kafka-apicurio-secured-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/Produce.java new file mode 100644 index 0000000..28b342d --- /dev/null +++ b/jbang/kafka-apicurio-secured-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/Produce.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.acme.example.kafka; + +import com.acme.example.kafka.models.Order; +import io.apicurio.registry.serde.SerdeConfig; +import io.apicurio.registry.serde.avro.AvroKafkaSerializer; +import io.apicurio.registry.serde.avro.AvroKafkaSerdeConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.StringSerializer; +import io.apicurio.registry.serde.avro.ReflectAvroDatumProvider; +import io.apicurio.registry.rest.client.exception.RestClientException; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class Produce { + + public static final String DEFAULT_PROPERTIES_PATH = "../application.properties"; + + public static void main(String[] args) throws IOException { + String propertiesPath = DEFAULT_PROPERTIES_PATH; + if (args.length >= 1) { + propertiesPath = args[0]; + } + + Properties properties = new Properties(); + properties.load(Files.newInputStream(Paths.get(propertiesPath))); + + String registryUrl = properties.getProperty("schema.registry.url"); + String authServiceUrl = properties.getProperty("keycloak.service.url"); + String authRealm = properties.getProperty("keycloak.realm"); + String clientId = properties.getProperty("keycloak.client.id"); + String clientSecret = properties.getProperty("keycloak.client.secret"); + String apicurioUsername = properties.getProperty("keycloak.apicurio.username"); + String apicurioPassword = properties.getProperty("keycloak.apicurio.password"); + + properties.put(SerdeConfig.REGISTRY_URL, registryUrl); + properties.put(SerdeConfig.AUTH_SERVICE_URL, authServiceUrl); + properties.put(SerdeConfig.AUTH_REALM, authRealm); + properties.put(SerdeConfig.AUTH_CLIENT_ID, clientId); + properties.put(SerdeConfig.AUTH_USERNAME, apicurioUsername); + properties.put(SerdeConfig.AUTH_PASSWORD, apicurioPassword); + properties.put(SerdeConfig.AUTH_CLIENT_SECRET, clientSecret); + properties.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, Boolean.TRUE); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AvroKafkaSerializer.class); + + + properties.put(AvroKafkaSerdeConfig.AVRO_DATUM_PROVIDER, ReflectAvroDatumProvider.class.getName()); + + try (KafkaProducer<String, Order> orderProducer = new KafkaProducer<>(properties)) { + Order order = new Order(1, "item", "user", 3.0, "A really nice item I do love"); + String topic = properties.getProperty("topic"); + ProducerRecord<String, Order> record = new ProducerRecord<>(topic, "key", order); + RecordMetadata result = orderProducer.send(record).get(5, TimeUnit.SECONDS); + System.out.println("Sent record with offset " + result.offset()); + } catch (ExecutionException | InterruptedException | TimeoutException | RestClientException e ) { + e.printStackTrace(); + } + } +} diff --git a/jbang/kafka-apicurio-secured-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/models/Order.java b/jbang/kafka-apicurio-secured-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/models/Order.java new file mode 100644 index 0000000..c559447 --- /dev/null +++ b/jbang/kafka-apicurio-secured-schema-registry/kafka-producer/src/main/java/com/acme/example/kafka/models/Order.java @@ -0,0 +1,673 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package com.acme.example.kafka.models; + +import org.apache.avro.generic.GenericArray; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.util.Utf8; +import org.apache.avro.message.BinaryMessageEncoder; +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.message.SchemaStore; + +/** Fact schema of an order */ +@org.apache.avro.specific.AvroGenerated +public class Order extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = -8676937297320921983L; + + + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Order\",\"namespace\":\"com.acme.example.kafka.models\",\"doc\":\"Fact schema of an order\",\"fields\":[{\"name\":\"orderId\",\"type\":\"int\",\"doc\":\"Unique id of the order.\"},{\"name\":\"itemId\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"Id of the ordered item.\"},{\"name\":\"userId\",\"type\":{\"type\":\"string\",\"avro.jav [...] + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + + private static final SpecificData MODEL$ = new SpecificData(); + + private static final BinaryMessageEncoder<Order> ENCODER = + new BinaryMessageEncoder<>(MODEL$, SCHEMA$); + + private static final BinaryMessageDecoder<Order> DECODER = + new BinaryMessageDecoder<>(MODEL$, SCHEMA$); + + /** + * Return the BinaryMessageEncoder instance used by this class. + * @return the message encoder used by this class + */ + public static BinaryMessageEncoder<Order> getEncoder() { + return ENCODER; + } + + /** + * Return the BinaryMessageDecoder instance used by this class. + * @return the message decoder used by this class + */ + public static BinaryMessageDecoder<Order> getDecoder() { + return DECODER; + } + + /** + * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}. + * @param resolver a {@link SchemaStore} used to find schemas by fingerprint + * @return a BinaryMessageDecoder instance for this class backed by the given SchemaStore + */ + public static BinaryMessageDecoder<Order> createDecoder(SchemaStore resolver) { + return new BinaryMessageDecoder<>(MODEL$, SCHEMA$, resolver); + } + + /** + * Serializes this Order to a ByteBuffer. + * @return a buffer holding the serialized data for this instance + * @throws java.io.IOException if this instance could not be serialized + */ + public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException { + return ENCODER.encode(this); + } + + /** + * Deserializes a Order from a ByteBuffer. + * @param b a byte buffer holding serialized data for an instance of this class + * @return a Order instance decoded from the given buffer + * @throws java.io.IOException if the given bytes could not be deserialized into an instance of this class + */ + public static Order fromByteBuffer( + java.nio.ByteBuffer b) throws java.io.IOException { + return DECODER.decode(b); + } + + /** Unique id of the order. */ + private int orderId; + /** Id of the ordered item. */ + private java.lang.String itemId; + /** Id of the user who ordered the item. */ + private java.lang.String userId; + /** How much was ordered. */ + private double quantity; + /** Description of item. */ + private java.lang.String description; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use <code>newBuilder()</code>. + */ + public Order() {} + + /** + * All-args constructor. + * @param orderId Unique id of the order. + * @param itemId Id of the ordered item. + * @param userId Id of the user who ordered the item. + * @param quantity How much was ordered. + * @param description Description of item. + */ + public Order(java.lang.Integer orderId, java.lang.String itemId, java.lang.String userId, java.lang.Double quantity, java.lang.String description) { + this.orderId = orderId; + this.itemId = itemId; + this.userId = userId; + this.quantity = quantity; + this.description = description; + } + + @Override + public org.apache.avro.specific.SpecificData getSpecificData() { return MODEL$; } + + @Override + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + + // Used by DatumWriter. Applications should not call. + @Override + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return orderId; + case 1: return itemId; + case 2: return userId; + case 3: return quantity; + case 4: return description; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + // Used by DatumReader. Applications should not call. + @Override + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: orderId = (java.lang.Integer)value$; break; + case 1: itemId = value$ != null ? value$.toString() : null; break; + case 2: userId = value$ != null ? value$.toString() : null; break; + case 3: quantity = (java.lang.Double)value$; break; + case 4: description = value$ != null ? value$.toString() : null; break; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + /** + * Gets the value of the 'orderId' field. + * @return Unique id of the order. + */ + public int getOrderId() { + return orderId; + } + + + /** + * Sets the value of the 'orderId' field. + * Unique id of the order. + * @param value the value to set. + */ + public void setOrderId(int value) { + this.orderId = value; + } + + /** + * Gets the value of the 'itemId' field. + * @return Id of the ordered item. + */ + public java.lang.String getItemId() { + return itemId; + } + + + /** + * Sets the value of the 'itemId' field. + * Id of the ordered item. + * @param value the value to set. + */ + public void setItemId(java.lang.String value) { + this.itemId = value; + } + + /** + * Gets the value of the 'userId' field. + * @return Id of the user who ordered the item. + */ + public java.lang.String getUserId() { + return userId; + } + + + /** + * Sets the value of the 'userId' field. + * Id of the user who ordered the item. + * @param value the value to set. + */ + public void setUserId(java.lang.String value) { + this.userId = value; + } + + /** + * Gets the value of the 'quantity' field. + * @return How much was ordered. + */ + public double getQuantity() { + return quantity; + } + + + /** + * Sets the value of the 'quantity' field. + * How much was ordered. + * @param value the value to set. + */ + public void setQuantity(double value) { + this.quantity = value; + } + + /** + * Gets the value of the 'description' field. + * @return Description of item. + */ + public java.lang.String getDescription() { + return description; + } + + + /** + * Sets the value of the 'description' field. + * Description of item. + * @param value the value to set. + */ + public void setDescription(java.lang.String value) { + this.description = value; + } + + /** + * Creates a new Order RecordBuilder. + * @return A new Order RecordBuilder + */ + public static com.acme.example.kafka.models.Order.Builder newBuilder() { + return new com.acme.example.kafka.models.Order.Builder(); + } + + /** + * Creates a new Order RecordBuilder by copying an existing Builder. + * @param other The existing builder to copy. + * @return A new Order RecordBuilder + */ + public static com.acme.example.kafka.models.Order.Builder newBuilder(com.acme.example.kafka.models.Order.Builder other) { + if (other == null) { + return new com.acme.example.kafka.models.Order.Builder(); + } else { + return new com.acme.example.kafka.models.Order.Builder(other); + } + } + + /** + * Creates a new Order RecordBuilder by copying an existing Order instance. + * @param other The existing instance to copy. + * @return A new Order RecordBuilder + */ + public static com.acme.example.kafka.models.Order.Builder newBuilder(com.acme.example.kafka.models.Order other) { + if (other == null) { + return new com.acme.example.kafka.models.Order.Builder(); + } else { + return new com.acme.example.kafka.models.Order.Builder(other); + } + } + + /** + * RecordBuilder for Order instances. + */ + @org.apache.avro.specific.AvroGenerated + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase<Order> + implements org.apache.avro.data.RecordBuilder<Order> { + + /** Unique id of the order. */ + private int orderId; + /** Id of the ordered item. */ + private java.lang.String itemId; + /** Id of the user who ordered the item. */ + private java.lang.String userId; + /** How much was ordered. */ + private double quantity; + /** Description of item. */ + private java.lang.String description; + + /** Creates a new Builder */ + private Builder() { + super(SCHEMA$, MODEL$); + } + + /** + * Creates a Builder by copying an existing Builder. + * @param other The existing Builder to copy. + */ + private Builder(com.acme.example.kafka.models.Order.Builder other) { + super(other); + if (isValidValue(fields()[0], other.orderId)) { + this.orderId = data().deepCopy(fields()[0].schema(), other.orderId); + fieldSetFlags()[0] = other.fieldSetFlags()[0]; + } + if (isValidValue(fields()[1], other.itemId)) { + this.itemId = data().deepCopy(fields()[1].schema(), other.itemId); + fieldSetFlags()[1] = other.fieldSetFlags()[1]; + } + if (isValidValue(fields()[2], other.userId)) { + this.userId = data().deepCopy(fields()[2].schema(), other.userId); + fieldSetFlags()[2] = other.fieldSetFlags()[2]; + } + if (isValidValue(fields()[3], other.quantity)) { + this.quantity = data().deepCopy(fields()[3].schema(), other.quantity); + fieldSetFlags()[3] = other.fieldSetFlags()[3]; + } + if (isValidValue(fields()[4], other.description)) { + this.description = data().deepCopy(fields()[4].schema(), other.description); + fieldSetFlags()[4] = other.fieldSetFlags()[4]; + } + } + + /** + * Creates a Builder by copying an existing Order instance + * @param other The existing instance to copy. + */ + private Builder(com.acme.example.kafka.models.Order other) { + super(SCHEMA$, MODEL$); + if (isValidValue(fields()[0], other.orderId)) { + this.orderId = data().deepCopy(fields()[0].schema(), other.orderId); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.itemId)) { + this.itemId = data().deepCopy(fields()[1].schema(), other.itemId); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.userId)) { + this.userId = data().deepCopy(fields()[2].schema(), other.userId); + fieldSetFlags()[2] = true; + } + if (isValidValue(fields()[3], other.quantity)) { + this.quantity = data().deepCopy(fields()[3].schema(), other.quantity); + fieldSetFlags()[3] = true; + } + if (isValidValue(fields()[4], other.description)) { + this.description = data().deepCopy(fields()[4].schema(), other.description); + fieldSetFlags()[4] = true; + } + } + + /** + * Gets the value of the 'orderId' field. + * Unique id of the order. + * @return The value. + */ + public int getOrderId() { + return orderId; + } + + + /** + * Sets the value of the 'orderId' field. + * Unique id of the order. + * @param value The value of 'orderId'. + * @return This builder. + */ + public com.acme.example.kafka.models.Order.Builder setOrderId(int value) { + validate(fields()[0], value); + this.orderId = value; + fieldSetFlags()[0] = true; + return this; + } + + /** + * Checks whether the 'orderId' field has been set. + * Unique id of the order. + * @return True if the 'orderId' field has been set, false otherwise. + */ + public boolean hasOrderId() { + return fieldSetFlags()[0]; + } + + + /** + * Clears the value of the 'orderId' field. + * Unique id of the order. + * @return This builder. + */ + public com.acme.example.kafka.models.Order.Builder clearOrderId() { + fieldSetFlags()[0] = false; + return this; + } + + /** + * Gets the value of the 'itemId' field. + * Id of the ordered item. + * @return The value. + */ + public java.lang.String getItemId() { + return itemId; + } + + + /** + * Sets the value of the 'itemId' field. + * Id of the ordered item. + * @param value The value of 'itemId'. + * @return This builder. + */ + public com.acme.example.kafka.models.Order.Builder setItemId(java.lang.String value) { + validate(fields()[1], value); + this.itemId = value; + fieldSetFlags()[1] = true; + return this; + } + + /** + * Checks whether the 'itemId' field has been set. + * Id of the ordered item. + * @return True if the 'itemId' field has been set, false otherwise. + */ + public boolean hasItemId() { + return fieldSetFlags()[1]; + } + + + /** + * Clears the value of the 'itemId' field. + * Id of the ordered item. + * @return This builder. + */ + public com.acme.example.kafka.models.Order.Builder clearItemId() { + itemId = null; + fieldSetFlags()[1] = false; + return this; + } + + /** + * Gets the value of the 'userId' field. + * Id of the user who ordered the item. + * @return The value. + */ + public java.lang.String getUserId() { + return userId; + } + + + /** + * Sets the value of the 'userId' field. + * Id of the user who ordered the item. + * @param value The value of 'userId'. + * @return This builder. + */ + public com.acme.example.kafka.models.Order.Builder setUserId(java.lang.String value) { + validate(fields()[2], value); + this.userId = value; + fieldSetFlags()[2] = true; + return this; + } + + /** + * Checks whether the 'userId' field has been set. + * Id of the user who ordered the item. + * @return True if the 'userId' field has been set, false otherwise. + */ + public boolean hasUserId() { + return fieldSetFlags()[2]; + } + + + /** + * Clears the value of the 'userId' field. + * Id of the user who ordered the item. + * @return This builder. + */ + public com.acme.example.kafka.models.Order.Builder clearUserId() { + userId = null; + fieldSetFlags()[2] = false; + return this; + } + + /** + * Gets the value of the 'quantity' field. + * How much was ordered. + * @return The value. + */ + public double getQuantity() { + return quantity; + } + + + /** + * Sets the value of the 'quantity' field. + * How much was ordered. + * @param value The value of 'quantity'. + * @return This builder. + */ + public com.acme.example.kafka.models.Order.Builder setQuantity(double value) { + validate(fields()[3], value); + this.quantity = value; + fieldSetFlags()[3] = true; + return this; + } + + /** + * Checks whether the 'quantity' field has been set. + * How much was ordered. + * @return True if the 'quantity' field has been set, false otherwise. + */ + public boolean hasQuantity() { + return fieldSetFlags()[3]; + } + + + /** + * Clears the value of the 'quantity' field. + * How much was ordered. + * @return This builder. + */ + public com.acme.example.kafka.models.Order.Builder clearQuantity() { + fieldSetFlags()[3] = false; + return this; + } + + /** + * Gets the value of the 'description' field. + * Description of item. + * @return The value. + */ + public java.lang.String getDescription() { + return description; + } + + + /** + * Sets the value of the 'description' field. + * Description of item. + * @param value The value of 'description'. + * @return This builder. + */ + public com.acme.example.kafka.models.Order.Builder setDescription(java.lang.String value) { + validate(fields()[4], value); + this.description = value; + fieldSetFlags()[4] = true; + return this; + } + + /** + * Checks whether the 'description' field has been set. + * Description of item. + * @return True if the 'description' field has been set, false otherwise. + */ + public boolean hasDescription() { + return fieldSetFlags()[4]; + } + + + /** + * Clears the value of the 'description' field. + * Description of item. + * @return This builder. + */ + public com.acme.example.kafka.models.Order.Builder clearDescription() { + description = null; + fieldSetFlags()[4] = false; + return this; + } + + @Override + @SuppressWarnings("unchecked") + public Order build() { + try { + Order record = new Order(); + record.orderId = fieldSetFlags()[0] ? this.orderId : (java.lang.Integer) defaultValue(fields()[0]); + record.itemId = fieldSetFlags()[1] ? this.itemId : (java.lang.String) defaultValue(fields()[1]); + record.userId = fieldSetFlags()[2] ? this.userId : (java.lang.String) defaultValue(fields()[2]); + record.quantity = fieldSetFlags()[3] ? this.quantity : (java.lang.Double) defaultValue(fields()[3]); + record.description = fieldSetFlags()[4] ? this.description : (java.lang.String) defaultValue(fields()[4]); + return record; + } catch (org.apache.avro.AvroMissingFieldException e) { + throw e; + } catch (java.lang.Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumWriter<Order> + WRITER$ = (org.apache.avro.io.DatumWriter<Order>)MODEL$.createDatumWriter(SCHEMA$); + + @Override public void writeExternal(java.io.ObjectOutput out) + throws java.io.IOException { + WRITER$.write(this, SpecificData.getEncoder(out)); + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumReader<Order> + READER$ = (org.apache.avro.io.DatumReader<Order>)MODEL$.createDatumReader(SCHEMA$); + + @Override public void readExternal(java.io.ObjectInput in) + throws java.io.IOException { + READER$.read(this, SpecificData.getDecoder(in)); + } + + @Override protected boolean hasCustomCoders() { return true; } + + @Override public void customEncode(org.apache.avro.io.Encoder out) + throws java.io.IOException + { + out.writeInt(this.orderId); + + out.writeString(this.itemId); + + out.writeString(this.userId); + + out.writeDouble(this.quantity); + + out.writeString(this.description); + + } + + @Override public void customDecode(org.apache.avro.io.ResolvingDecoder in) + throws java.io.IOException + { + org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff(); + if (fieldOrder == null) { + this.orderId = in.readInt(); + + this.itemId = in.readString(); + + this.userId = in.readString(); + + this.quantity = in.readDouble(); + + this.description = in.readString(); + + } else { + for (int i = 0; i < 5; i++) { + switch (fieldOrder[i].pos()) { + case 0: + this.orderId = in.readInt(); + break; + + case 1: + this.itemId = in.readString(); + break; + + case 2: + this.userId = in.readString(); + break; + + case 3: + this.quantity = in.readDouble(); + break; + + case 4: + this.description = in.readString(); + break; + + default: + throw new java.io.IOException("Corrupt ResolvingDecoder."); + } + } + } + } +} + + + + + + + + + +