I have made the suggested change and used ConfluentSchemaRegistryDeserializerProvider the results are slightly better.. average of 8000 msg/sec
Thank you both for your response and i'll appreciate if you can keep me in the loop in the planned work with kafka schema or let me know if i can assist in anyway, Thanks Sigalit On Wed, Apr 12, 2023 at 8:00 PM Alexey Romanenko <aromanenko....@gmail.com> wrote: > Mine was the similar but > "org.apache.beam.sdk.io.kafka,ConfluentSchemaRegistryDeserializerProvider" > is leveraging > “io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient” that > I guessed should reduce this potential impact. > > — > Alexey > > On 12 Apr 2023, at 17:36, John Casey via user <user@beam.apache.org> > wrote: > > My initial guess is that there are queries being made in order to > retrieve the schemas, which would impact performance, especially if those > queries aren't cached with Beam splitting in mind. > > I'm looking to improve our interaction with Kafka schemas in the next > couple of quarters, so I'll keep this case in mind while working on that. > > John > > On Tue, Apr 11, 2023 at 10:29 AM Alexey Romanenko < > aromanenko....@gmail.com> wrote: > >> I don’t have an exact answer why it’s so much slower for now (only some >> guesses but it requires some profiling), though could you try to test the >> same Kafka read but with “ConfluentSchemaRegistryDeserializerProvider” >> instead of KafkaAvroDeserializer and AvroCoder? >> >> More details and an example how to use is here: >> >> https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html >> (go >> to “Use Avro schema with Confluent Schema Registry”) >> >> — >> Alexey >> >> >> >> On 10 Apr 2023, at 07:35, Sigalit Eliazov <e.siga...@gmail.com> wrote: >> >> hi, >> >> KafkaIO.<String, T>read() >> .withBootstrapServers(bootstrapServers) >> .withTopic(topic) >> .withConsumerConfigUpdates(Map.ofEntries( >> Map.entry("schema.registry.url", registryURL), >> Map.entry(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup+ >> UUID.randomUUID()), >> )) >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializerAndCoder((Class) >> io.confluent.kafka.serializers.KafkaAvroDeserializer.class, >> AvroCoder.of(avroClass)); >> >> >> Thanks >> >> Sigalit >> >> >> On Mon, Apr 10, 2023 at 2:58 AM Reuven Lax via user <user@beam.apache.org> >> wrote: >> >>> How are you using the schema registry? Do you have a code sample? >>> >>> On Sun, Apr 9, 2023 at 3:06 AM Sigalit Eliazov <e.siga...@gmail.com> >>> wrote: >>> >>>> Hello, >>>> >>>> I am trying to understand the effect of schema registry on our >>>> pipeline's performance. In order to do sowe created a very simple pipeline >>>> that reads from kafka, runs a simple transformation of adding new field and >>>> writes of kafka. the messages are in avro format >>>> >>>> I ran this pipeline with 3 different options on same configuration : 1 >>>> kafka partition, 1 task manager, 1 slot, 1 parallelism: >>>> >>>> * when i used apicurio as the schema registry i was able to process >>>> only 2000 messages per second >>>> * when i used confluent schema registry i was able to process 7000 >>>> messages per second >>>> * when I did not use any schema registry and used plain avro >>>> deserializer/serializer i was able to process *30K* messages per >>>> second. >>>> >>>> I understand that using a schema registry may cause a reduction in >>>> performance but in my opinion the difference is too high. >>>> Any comments or suggestions about these results? >>>> >>>> Thanks in advance >>>> Sigalit >>>> >>> >> >