Mine was the similar but 
"org.apache.beam.sdk.io.kafka,ConfluentSchemaRegistryDeserializerProvider" is 
leveraging 
“io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient” that I 
guessed should reduce this potential impact.

—
Alexey

> On 12 Apr 2023, at 17:36, John Casey via user <[email protected]> wrote:
> 
> My initial guess is that there are queries being made in order to retrieve 
> the schemas, which would impact performance, especially if those queries 
> aren't cached with Beam splitting in mind. 
> 
> I'm looking to improve our interaction with Kafka schemas in the next couple 
> of quarters, so I'll keep this case in mind while working on that.
> 
> John
> 
> On Tue, Apr 11, 2023 at 10:29 AM Alexey Romanenko <[email protected] 
> <mailto:[email protected]>> wrote:
>> I don’t have an exact answer why it’s so much slower for now (only some 
>> guesses but it requires some profiling), though could you try to test the 
>> same Kafka read but with “ConfluentSchemaRegistryDeserializerProvider” 
>> instead of KafkaAvroDeserializer and AvroCoder?
>> 
>> More details and an example how to use is here:
>> https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html
>>  (go to “Use Avro schema with Confluent Schema Registry”)
>> 
>> —
>> Alexey
>> 
>> 
>> 
>>> On 10 Apr 2023, at 07:35, Sigalit Eliazov <[email protected] 
>>> <mailto:[email protected]>> wrote:
>>> 
>>> hi,
>>> KafkaIO.<String, T>read()
>>>         .withBootstrapServers(bootstrapServers)
>>>         .withTopic(topic)
>>>         .withConsumerConfigUpdates(Map.ofEntries(
>>>                 Map.entry("schema.registry.url", registryURL),
>>>                 Map.entry(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup+ 
>>> UUID.randomUUID()),
>>>         ))
>>>         .withKeyDeserializer(StringDeserializer.class)
>>>         .withValueDeserializerAndCoder((Class) 
>>> io.confluent.kafka.serializers.KafkaAvroDeserializer.class, 
>>> AvroCoder.of(avroClass));
>>> 
>>> Thanks
>>> Sigalit
>>> 
>>> On Mon, Apr 10, 2023 at 2:58 AM Reuven Lax via user <[email protected] 
>>> <mailto:[email protected]>> wrote:
>>>> How are you using the schema registry? Do you have a code sample?
>>>> 
>>>> On Sun, Apr 9, 2023 at 3:06 AM Sigalit Eliazov <[email protected] 
>>>> <mailto:[email protected]>> wrote:
>>>>> Hello,
>>>>> 
>>>>> I am trying to understand the effect of schema registry on our pipeline's 
>>>>> performance. In order to do sowe created a very simple pipeline that 
>>>>> reads from kafka, runs a simple transformation of adding new field and 
>>>>> writes of kafka.  the messages are in avro format
>>>>> 
>>>>> I ran this pipeline with 3 different options on same configuration : 1 
>>>>> kafka partition, 1 task manager, 1 slot, 1 parallelism:
>>>>> 
>>>>> * when i used apicurio as the schema registry i was able to process only 
>>>>> 2000 messages per second
>>>>> * when i used confluent schema registry i was able to process 7000 
>>>>> messages per second
>>>>> * when I did not use any schema registry and used plain avro 
>>>>> deserializer/serializer i was able to process 30K messages per second.
>>>>> 
>>>>> I understand that using a schema registry may cause a reduction in 
>>>>> performance but  in my opinion the difference is too high. 
>>>>> Any comments or suggestions about these results?
>>>>> 
>>>>> Thanks in advance
>>>>> Sigalit
>> 

Reply via email to