[ 
https://issues.apache.org/jira/browse/CAMEL-20864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17855517#comment-17855517
 ] 

Claus Ibsen commented on CAMEL-20864:
-------------------------------------

You are welcome to add docs, click "edit this page" in the bottom

[https://camel.apache.org/components/next/kafka-component.html]

 

And send the doc changes as PR

> camel-kafka - With confluent schema registry does not work properly.
> --------------------------------------------------------------------
>
>                 Key: CAMEL-20864
>                 URL: https://issues.apache.org/jira/browse/CAMEL-20864
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 3.22.2
>         Environment: Confluent Kafka
> Camel-3.22.x
> Java 17
>            Reporter: Kartik
>            Assignee: Claus Ibsen
>            Priority: Minor
>              Labels: camel-kafka
>             Fix For: 3.21.5, 3.22.3, 4.0.6, 4.4.3, 4.7.0
>
>         Attachments: image-2024-06-12-11-05-12-616.png, 
> image-2024-06-12-11-06-33-915.png, image-2024-06-12-12-51-58-966.png
>
>
> In confluent kafka, we can register the topic against schema validation from 
> the schema registry. When configured the confluent document says either we 
> should have a pojo object defined in the code that is used for 
> serialization/deserialization *OR* a custom "ObjectNode" can be created from 
> their schema utils. Attaching the document below
> [https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-json.html#sending-a-jsonnode-payload]
>  
>  
> For our case, we have a different schema registered and can't have all the 
> POJO defined as schema registered at run time, so we are using the below code 
> to generate an object from the schema.
> {code:java}
> Map<String, Object> config = new HashMap<>();
> config.put("basic.auth.credentials.source", "USER_INFO");
> config.put("basic.auth.user.info", "<secret-key>");
> config.put("auto.register.schemas", false);
> config.put("use.latest.version", true);
> CachedSchemaRegistryClient registryClient = new 
> CachedSchemaRegistryClient("<registry-url>", 10, config);
> String schemaDoc = 
> registryClient.getLatestSchemaMetadata("topicTest-value").getSchema();
> JsonSchema schema = new JsonSchema(schemaDoc);
> ObjectMapper mapper = new ObjectMapper();
> JsonNode jsonNode = 
> mapper.readTree("{\"myField1\":123,\"myField2\":123.32,\"myField3\":\"pqr\"}");
> ObjectNode envelope = JsonSchemaUtils.envelope(schema, jsonNode);
> from("timer://foo?fixedRate=true&period=60000")
>         .setBody(ExpressionBuilder.constantExpression(envelope))
>         .log("Sent new message")// Message to send
>         .to(kafkaEndpoint); {code}
> If the "ObjectNode" payload is directly written using kafka-client library it 
> works. But when written using camel component, The "KafkaProducer" in camel 
> component does "isIterable" check and if true sends each value and this 
> doesn't work for confluent kafka as the custom 
> "{*}io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer{*}" expects 
> a whole object.
> !image-2024-06-12-11-05-12-616.png|width=733,height=459!
>  
>  The code in "
> io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer expects whole 
> object.
> !image-2024-06-12-11-06-33-915.png|width=906,height=438!
>  
> Basically, in simple words, The "envelope" object created is no longer the 
> same object but is iterated and values are iterated and sent independently 
> resulting in schema validation error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to