hi,

KafkaIO.<String, T>read()
        .withBootstrapServers(bootstrapServers)
        .withTopic(topic)
        .withConsumerConfigUpdates(Map.ofEntries(
                Map.entry("schema.registry.url", registryURL),
                Map.entry(ConsumerConfig.GROUP_ID_CONFIG,
consumerGroup+ UUID.randomUUID()),
        ))
        .withKeyDeserializer(StringDeserializer.class)
        .withValueDeserializerAndCoder((Class)
io.confluent.kafka.serializers.KafkaAvroDeserializer.class,
AvroCoder.of(avroClass));


Thanks

Sigalit


On Mon, Apr 10, 2023 at 2:58 AM Reuven Lax via user <user@beam.apache.org>
wrote:

> How are you using the schema registry? Do you have a code sample?
>
> On Sun, Apr 9, 2023 at 3:06 AM Sigalit Eliazov <e.siga...@gmail.com>
> wrote:
>
>> Hello,
>>
>> I am trying to understand the effect of schema registry on our pipeline's
>> performance. In order to do sowe created a very simple pipeline that reads
>> from kafka, runs a simple transformation of adding new field and writes of
>> kafka.  the messages are in avro format
>>
>> I ran this pipeline with 3 different options on same configuration : 1
>> kafka partition, 1 task manager, 1 slot, 1 parallelism:
>>
>> * when i used apicurio as the schema registry i was able to process only
>> 2000 messages per second
>> * when i used confluent schema registry i was able to process 7000
>> messages per second
>> * when I did not use any schema registry and used plain avro
>> deserializer/serializer i was able to process *30K* messages per second.
>>
>> I understand that using a schema registry may cause a reduction in
>> performance but  in my opinion the difference is too high.
>> Any comments or suggestions about these results?
>>
>> Thanks in advance
>> Sigalit
>>
>

Reply via email to