Hi Sigalit,

Could you explain a bit more in detail what you mean by 2 different types of 
messages?
Do they share the same schema, e.g. using a union / one of type? Or are you in 
fact talking about different messages with separate schemas (e.g. discriminated 
using a message header)?

The recommended usage (at least with Confluent) is to use one schema per topic. 
Using the Confluent registry it’s fairly simple then:

             .withValueDeserializer(
                    ConfluentSchemaRegistryDeserializerProvider.of(registryUrl, 
subject, null /* latest */, config)))

Most likely you have to implement a similar DeserializerProvider for Apicurio. 
You could also try using  apicurio.registry.as-confluent, but that requires to 
configure your producers accordingly.
I any case, I suggest you study ConfluentSchemaRegistryDeserializerProvider. 
That should lead you a path forward.

Best,
Moritz

On 09.08.22, 13:08, "Sigalit Eliazov" <e.siga...@gmail.com> wrote:

Hi all we have a single kafka topic which is used to receive 2 different types 
of messages. These 2 messages are Avro. So when reading messages from kafka i 
used the GenericRecord KafkaIO. <String, GenericRecord>read() 
.withBootstrapServers(bootstrapServers)

Hi all
we have a single kafka topic which is used to receive 2 different types of 
messages.
These 2 messages are Avro.
So when reading messages from kafka i used the GenericRecord


KafkaIO.<String, GenericRecord>read()
        .withBootstrapServers(bootstrapServers)
        .withTopic(topic)
        .withConsumerConfigUpdates(ImmutableMap.of(
                SerdeConfig.REGISTRY_URL, PipelineUtil.getSchemaURL(),
                ConsumerConfig.GROUP_ID_CONFIG, consumerGroup,
                SerdeConfig.CHECK_PERIOD_MS, TimeUnit.DAYS.toMillis(1)
        ))
        .withKeyDeserializer(StringDeserializer.class)

I am not sure how to define the withValueDeserializer and coder.

i tried to read the message as GenericRecord but it fails with

 "Could not extract the Kafka Deserializer type from class 
io.apicurio.registry.serde.avro.AvroKafkaDeserialize"

i am using apicurio as the schema registry



Thanks

Sigalit

As a recipient of an email from Talend, your contact personal data will be on 
our systems. Please see our privacy notice. <https://www.talend.com/privacy/>


Reply via email to