Mine was the similar but "org.apache.beam.sdk.io.kafka,ConfluentSchemaRegistryDeserializerProvider" is leveraging “io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient” that I guessed should reduce this potential impact.
— Alexey > On 12 Apr 2023, at 17:36, John Casey via user <[email protected]> wrote: > > My initial guess is that there are queries being made in order to retrieve > the schemas, which would impact performance, especially if those queries > aren't cached with Beam splitting in mind. > > I'm looking to improve our interaction with Kafka schemas in the next couple > of quarters, so I'll keep this case in mind while working on that. > > John > > On Tue, Apr 11, 2023 at 10:29 AM Alexey Romanenko <[email protected] > <mailto:[email protected]>> wrote: >> I don’t have an exact answer why it’s so much slower for now (only some >> guesses but it requires some profiling), though could you try to test the >> same Kafka read but with “ConfluentSchemaRegistryDeserializerProvider” >> instead of KafkaAvroDeserializer and AvroCoder? >> >> More details and an example how to use is here: >> https://beam.apache.org/releases/javadoc/2.46.0/org/apache/beam/sdk/io/kafka/KafkaIO.html >> (go to “Use Avro schema with Confluent Schema Registry”) >> >> — >> Alexey >> >> >> >>> On 10 Apr 2023, at 07:35, Sigalit Eliazov <[email protected] >>> <mailto:[email protected]>> wrote: >>> >>> hi, >>> KafkaIO.<String, T>read() >>> .withBootstrapServers(bootstrapServers) >>> .withTopic(topic) >>> .withConsumerConfigUpdates(Map.ofEntries( >>> Map.entry("schema.registry.url", registryURL), >>> Map.entry(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup+ >>> UUID.randomUUID()), >>> )) >>> .withKeyDeserializer(StringDeserializer.class) >>> .withValueDeserializerAndCoder((Class) >>> io.confluent.kafka.serializers.KafkaAvroDeserializer.class, >>> AvroCoder.of(avroClass)); >>> >>> Thanks >>> Sigalit >>> >>> On Mon, Apr 10, 2023 at 2:58 AM Reuven Lax via user <[email protected] >>> <mailto:[email protected]>> wrote: >>>> How are you using the schema registry? Do you have a code sample? >>>> >>>> On Sun, Apr 9, 2023 at 3:06 AM Sigalit Eliazov <[email protected] >>>> <mailto:[email protected]>> wrote: >>>>> Hello, >>>>> >>>>> I am trying to understand the effect of schema registry on our pipeline's >>>>> performance. In order to do sowe created a very simple pipeline that >>>>> reads from kafka, runs a simple transformation of adding new field and >>>>> writes of kafka. the messages are in avro format >>>>> >>>>> I ran this pipeline with 3 different options on same configuration : 1 >>>>> kafka partition, 1 task manager, 1 slot, 1 parallelism: >>>>> >>>>> * when i used apicurio as the schema registry i was able to process only >>>>> 2000 messages per second >>>>> * when i used confluent schema registry i was able to process 7000 >>>>> messages per second >>>>> * when I did not use any schema registry and used plain avro >>>>> deserializer/serializer i was able to process 30K messages per second. >>>>> >>>>> I understand that using a schema registry may cause a reduction in >>>>> performance but in my opinion the difference is too high. >>>>> Any comments or suggestions about these results? >>>>> >>>>> Thanks in advance >>>>> Sigalit >>
