Thanks for testing this!

It requires some additional investigations, so I created an issue for that:

Feel free to add more details if you have there.


> On 13 Apr 2023, at 12:45, Sigalit Eliazov <> wrote:
> I have made the suggested change and used 
> ConfluentSchemaRegistryDeserializerProvider
> the results are slightly  better.. average of 8000 msg/sec 
> Thank you both for your response and i'll appreciate if you can keep me in 
> the loop in the planned work with kafka schema or let me know if i can assist 
> in anyway,
> Thanks
> Sigalit
> On Wed, Apr 12, 2023 at 8:00 PM Alexey Romanenko < 
> <>> wrote:
>> Mine was the similar but 
>> ",ConfluentSchemaRegistryDeserializerProvider" 
>> is leveraging 
>> “io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient” that I 
>> guessed should reduce this potential impact.
>> —
>> Alexey
>>> On 12 Apr 2023, at 17:36, John Casey via user < 
>>> <>> wrote:
>>> My initial guess is that there are queries being made in order to retrieve 
>>> the schemas, which would impact performance, especially if those queries 
>>> aren't cached with Beam splitting in mind. 
>>> I'm looking to improve our interaction with Kafka schemas in the next 
>>> couple of quarters, so I'll keep this case in mind while working on that.
>>> John
>>> On Tue, Apr 11, 2023 at 10:29 AM Alexey Romanenko < 
>>> <>> wrote:
>>>> I don’t have an exact answer why it’s so much slower for now (only some 
>>>> guesses but it requires some profiling), though could you try to test the 
>>>> same Kafka read but with “ConfluentSchemaRegistryDeserializerProvider” 
>>>> instead of KafkaAvroDeserializer and AvroCoder?
>>>> More details and an example how to use is here:
>>>>  (go to “Use Avro schema with Confluent Schema Registry”)
>>>> —
>>>> Alexey
>>>>> On 10 Apr 2023, at 07:35, Sigalit Eliazov < 
>>>>> <>> wrote:
>>>>> hi,
>>>>> KafkaIO.<String, T>read()
>>>>>         .withBootstrapServers(bootstrapServers)
>>>>>         .withTopic(topic)
>>>>>         .withConsumerConfigUpdates(Map.ofEntries(
>>>>>                 Map.entry("schema.registry.url", registryURL),
>>>>>                 Map.entry(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup+ 
>>>>> UUID.randomUUID()),
>>>>>         ))
>>>>>         .withKeyDeserializer(StringDeserializer.class)
>>>>>         .withValueDeserializerAndCoder((Class) 
>>>>> io.confluent.kafka.serializers.KafkaAvroDeserializer.class, 
>>>>> AvroCoder.of(avroClass));
>>>>> Thanks
>>>>> Sigalit
>>>>> On Mon, Apr 10, 2023 at 2:58 AM Reuven Lax via user < 
>>>>> <>> wrote:
>>>>>> How are you using the schema registry? Do you have a code sample?
>>>>>> On Sun, Apr 9, 2023 at 3:06 AM Sigalit Eliazov < 
>>>>>> <>> wrote:
>>>>>>> Hello,
>>>>>>> I am trying to understand the effect of schema registry on our 
>>>>>>> pipeline's performance. In order to do sowe created a very simple 
>>>>>>> pipeline that reads from kafka, runs a simple transformation of adding 
>>>>>>> new field and writes of kafka.  the messages are in avro format
>>>>>>> I ran this pipeline with 3 different options on same configuration : 1 
>>>>>>> kafka partition, 1 task manager, 1 slot, 1 parallelism:
>>>>>>> * when i used apicurio as the schema registry i was able to process 
>>>>>>> only 2000 messages per second
>>>>>>> * when i used confluent schema registry i was able to process 7000 
>>>>>>> messages per second
>>>>>>> * when I did not use any schema registry and used plain avro 
>>>>>>> deserializer/serializer i was able to process 30K messages per second.
>>>>>>> I understand that using a schema registry may cause a reduction in 
>>>>>>> performance but  in my opinion the difference is too high. 
>>>>>>> Any comments or suggestions about these results?
>>>>>>> Thanks in advance
>>>>>>> Sigalit

Reply via email to