[ https://issues.apache.org/jira/browse/CAMEL-20864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17855517#comment-17855517 ]
Claus Ibsen commented on CAMEL-20864: ------------------------------------- You are welcome to add docs, click "edit this page" in the bottom [https://camel.apache.org/components/next/kafka-component.html] And send the doc changes as PR > camel-kafka - With confluent schema registry does not work properly. > -------------------------------------------------------------------- > > Key: CAMEL-20864 > URL: https://issues.apache.org/jira/browse/CAMEL-20864 > Project: Camel > Issue Type: Bug > Components: camel-kafka > Affects Versions: 3.22.2 > Environment: Confluent Kafka > Camel-3.22.x > Java 17 > Reporter: Kartik > Assignee: Claus Ibsen > Priority: Minor > Labels: camel-kafka > Fix For: 3.21.5, 3.22.3, 4.0.6, 4.4.3, 4.7.0 > > Attachments: image-2024-06-12-11-05-12-616.png, > image-2024-06-12-11-06-33-915.png, image-2024-06-12-12-51-58-966.png > > > In confluent kafka, we can register the topic against schema validation from > the schema registry. When configured the confluent document says either we > should have a pojo object defined in the code that is used for > serialization/deserialization *OR* a custom "ObjectNode" can be created from > their schema utils. Attaching the document below > [https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/serdes-json.html#sending-a-jsonnode-payload] > > > For our case, we have a different schema registered and can't have all the > POJO defined as schema registered at run time, so we are using the below code > to generate an object from the schema. > {code:java} > Map<String, Object> config = new HashMap<>(); > config.put("basic.auth.credentials.source", "USER_INFO"); > config.put("basic.auth.user.info", "<secret-key>"); > config.put("auto.register.schemas", false); > config.put("use.latest.version", true); > CachedSchemaRegistryClient registryClient = new > CachedSchemaRegistryClient("<registry-url>", 10, config); > String schemaDoc = > registryClient.getLatestSchemaMetadata("topicTest-value").getSchema(); > JsonSchema schema = new JsonSchema(schemaDoc); > ObjectMapper mapper = new ObjectMapper(); > JsonNode jsonNode = > mapper.readTree("{\"myField1\":123,\"myField2\":123.32,\"myField3\":\"pqr\"}"); > ObjectNode envelope = JsonSchemaUtils.envelope(schema, jsonNode); > from("timer://foo?fixedRate=true&period=60000") > .setBody(ExpressionBuilder.constantExpression(envelope)) > .log("Sent new message")// Message to send > .to(kafkaEndpoint); {code} > If the "ObjectNode" payload is directly written using kafka-client library it > works. But when written using camel component, The "KafkaProducer" in camel > component does "isIterable" check and if true sends each value and this > doesn't work for confluent kafka as the custom > "{*}io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer{*}" expects > a whole object. > !image-2024-06-12-11-05-12-616.png|width=733,height=459! > > The code in " > io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer expects whole > object. > !image-2024-06-12-11-06-33-915.png|width=906,height=438! > > Basically, in simple words, The "envelope" object created is no longer the > same object but is iterated and values are iterated and sent independently > resulting in schema validation error. -- This message was sent by Atlassian Jira (v8.20.10#820010)