Thanks for your response
we have different messages with separate schemas.

I'll review the suggested solution.
BR
Sigalit

On Tue, Aug 9, 2022 at 3:40 PM Moritz Mack <mm...@talend.com> wrote:

> Hi Sigalit,
>
>
>
> Could you explain a bit more in detail what you mean by 2 different types
> of messages?
>
> Do they share the same schema, e.g. using a union / one of type? Or are
> you in fact talking about different messages with separate schemas (e.g.
> discriminated using a message header)?
>
>
>
> The recommended usage (at least with Confluent) is to use one schema per
> topic. Using the Confluent registry it’s fairly simple then:
>
>
>
>              .withValueDeserializer(
>
>
> ConfluentSchemaRegistryDeserializerProvider.of(registryUrl, subject, null
> /* latest */, config)))
>
>
>
> Most likely you have to implement a similar DeserializerProvider for
> Apicurio. You could also try using  apicurio.registry.as-confluent, but
> that requires to configure your producers accordingly.
>
> I any case, I suggest you study
> ConfluentSchemaRegistryDeserializerProvider. That should lead you a path
> forward.
>
>
>
> Best,
>
> Moritz
>
>
>
> On 09.08.22, 13:08, "Sigalit Eliazov" <e.siga...@gmail.com> wrote:
>
>
>
> Hi all we have a single kafka topic which is used to receive 2 different
> types of messages. These 2 messages are Avro. So when reading messages from
> kafka i used the GenericRecord KafkaIO. <String, GenericRecord>read()
> .withBootstrapServers(bootstrapServers)
>
> Hi all
>
> we have a single kafka topic which is used to receive 2 different types of
> messages.
>
> These 2 messages are Avro.
>
> So when reading messages from kafka i used the GenericRecord
>
>
>
> KafkaIO.<String, GenericRecord>*read*()
>         .withBootstrapServers(bootstrapServers)
>         .withTopic(topic)
>         .withConsumerConfigUpdates(ImmutableMap.*of*(
>                 SerdeConfig.*REGISTRY_URL*, PipelineUtil.*getSchemaURL*(),
>                 ConsumerConfig.*GROUP_ID_CONFIG*, consumerGroup,
>                 SerdeConfig.*CHECK_PERIOD_MS*, TimeUnit.*DAYS*.toMillis(1)
>         ))
>         .withKeyDeserializer(StringDeserializer.class)
>
> I am not sure how to define the *withValueDeserializer* and coder.
>
> i tried to read the message as GenericRecord but it fails with
>
>  "Could not extract the Kafka Deserializer type from class 
> io.apicurio.registry.serde.avro.AvroKafkaDeserialize"
>
> i am using apicurio as the schema registry
>
>
>
> Thanks
>
> Sigalit
>
> *As a recipient of an email from Talend, your contact personal data will
> be on our systems. Please see our privacy notice.
> <https://www.talend.com/privacy/>*
>
>
>

Reply via email to