I have made the suggested change and used
ConfluentSchemaRegistryDeserializerProvider
the results are slightly  better.. average of 8000 msg/sec

Thank you both for your response and i'll appreciate if you can keep me in
the loop in the planned work with kafka schema or let me know if i can
assist in anyway,

Thanks
Sigalit

On Wed, Apr 12, 2023 at 8:00 PM Alexey Romanenko <aromanenko....@gmail.com>
wrote:

> Mine was the similar but
> "org.apache.beam.sdk.io.kafka,ConfluentSchemaRegistryDeserializerProvider"
> is leveraging
> “io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient” that
> I guessed should reduce this potential impact.
>
> —
> Alexey
>
> On 12 Apr 2023, at 17:36, John Casey via user <user@beam.apache.org>
> wrote:
>
> My initial guess is that there are queries being made in order to
> retrieve the schemas, which would impact performance, especially if those
> queries aren't cached with Beam splitting in mind.
>
> I'm looking to improve our interaction with Kafka schemas in the next
> couple of quarters, so I'll keep this case in mind while working on that.
>
> John
>
> On Tue, Apr 11, 2023 at 10:29 AM Alexey Romanenko <
> aromanenko....@gmail.com> wrote:
>
>> I don’t have an exact answer why it’s so much slower for now (only some
>> guesses but it requires some profiling), though could you try to test the
>> same Kafka read but with “ConfluentSchemaRegistryDeserializerProvider”
>> instead of KafkaAvroDeserializer and AvroCoder?
>>
>> More details and an example how to use is here:
>>
>> https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html
>>  (go
>> to “Use Avro schema with Confluent Schema Registry”)
>>
>> —
>> Alexey
>>
>>
>>
>> On 10 Apr 2023, at 07:35, Sigalit Eliazov <e.siga...@gmail.com> wrote:
>>
>> hi,
>>
>> KafkaIO.<String, T>read()
>>         .withBootstrapServers(bootstrapServers)
>>         .withTopic(topic)
>>         .withConsumerConfigUpdates(Map.ofEntries(
>>                 Map.entry("schema.registry.url", registryURL),
>>                 Map.entry(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup+ 
>> UUID.randomUUID()),
>>         ))
>>         .withKeyDeserializer(StringDeserializer.class)
>>         .withValueDeserializerAndCoder((Class) 
>> io.confluent.kafka.serializers.KafkaAvroDeserializer.class, 
>> AvroCoder.of(avroClass));
>>
>>
>> Thanks
>>
>> Sigalit
>>
>>
>> On Mon, Apr 10, 2023 at 2:58 AM Reuven Lax via user <user@beam.apache.org>
>> wrote:
>>
>>> How are you using the schema registry? Do you have a code sample?
>>>
>>> On Sun, Apr 9, 2023 at 3:06 AM Sigalit Eliazov <e.siga...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I am trying to understand the effect of schema registry on our
>>>> pipeline's performance. In order to do sowe created a very simple pipeline
>>>> that reads from kafka, runs a simple transformation of adding new field and
>>>> writes of kafka.  the messages are in avro format
>>>>
>>>> I ran this pipeline with 3 different options on same configuration : 1
>>>> kafka partition, 1 task manager, 1 slot, 1 parallelism:
>>>>
>>>> * when i used apicurio as the schema registry i was able to process
>>>> only 2000 messages per second
>>>> * when i used confluent schema registry i was able to process 7000
>>>> messages per second
>>>> * when I did not use any schema registry and used plain avro
>>>> deserializer/serializer i was able to process *30K* messages per
>>>> second.
>>>>
>>>> I understand that using a schema registry may cause a reduction in
>>>> performance but  in my opinion the difference is too high.
>>>> Any comments or suggestions about these results?
>>>>
>>>> Thanks in advance
>>>> Sigalit
>>>>
>>>
>>
>

Reply via email to