Hi all
we have a single kafka topic which is used to receive 2 different types of
messages.
These 2 messages are Avro.
So when reading messages from kafka i used the GenericRecord
KafkaIO.<String, GenericRecord>read()
.withBootstrapServers(bootstrapServers)
.withTopic(topic)
.withConsumerConfigUpdates(ImmutableMap.of(
SerdeConfig.REGISTRY_URL, PipelineUtil.getSchemaURL(),
ConsumerConfig.GROUP_ID_CONFIG, consumerGroup,
SerdeConfig.CHECK_PERIOD_MS, TimeUnit.DAYS.toMillis(1)
))
.withKeyDeserializer(StringDeserializer.class)
I am not sure how to define the *withValueDeserializer* and coder.
i tried to read the message as GenericRecord but it fails with
"Could not extract the Kafka Deserializer type from class
io.apicurio.registry.serde.avro.AvroKafkaDeserialize"
i am using apicurio as the schema registry
Thanks
Sigalit