Ok, it looks like you've found that solution already based on your question in [1].
[1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Proper-way-to-get-DataStream-lt-GenericRecord-gt-td42640.html On Wed, Mar 31, 2021 at 1:26 PM Matthias Pohl <matth...@ververica.com> wrote: > Hi Maminspapin, > I haven't worked with Kafka/Flink, yet. But have you had a look at the > docs about the DeserializationSchema [1]? It > mentions ConfluentRegistryAvroDeserializationSchema. Is this something > you're looking for? > > Best, > Matthias > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema > > On Tue, Mar 30, 2021 at 6:55 AM Maminspapin <un...@mail.ru> wrote: > >> I tried this: >> >> 1. Schema (found in stackoverflow) >> >> class GenericRecordSchema implements >> KafkaDeserializationSchema<GenericRecord> { >> >> private String registryUrl; >> private transient KafkaAvroDeserializer deserializer; >> >> public GenericRecordSchema(String registryUrl) { >> this.registryUrl = registryUrl; >> } >> >> @Override >> public boolean isEndOfStream(GenericRecord nextElement) { >> return false; >> } >> >> @Override >> public GenericRecord deserialize(ConsumerRecord<byte[], byte[]> >> consumerRecord) throws Exception { >> checkInitialized(); >> return (GenericRecord) >> deserializer.deserialize(consumerRecord.topic(), consumerRecord.value()); >> } >> >> @Override >> public TypeInformation<GenericRecord> getProducedType() { >> return TypeExtractor.getForClass(GenericRecord.class); >> } >> >> private void checkInitialized() { >> if (deserializer == null) { >> Map<String, Object> props = new HashMap<>(); >> >> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, >> registryUrl); >> >> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false); >> SchemaRegistryClient client = >> new CachedSchemaRegistryClient( >> registryUrl, >> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); >> deserializer = new KafkaAvroDeserializer(client, props); >> } >> } >> } >> >> 2. Consumer >> >> private static FlinkKafkaConsumer<GenericRecord> getConsumer(String >> topic) { >> >> return new FlinkKafkaConsumer<>( >> topic, >> new GenericRecordSchema("http://xxx.xx.xxx.xx:8081"), >> getConsumerProperties()); >> } >> >> But when I start the app, the following error is happen: >> >> com.esotericsoftware.kryo.KryoException: >> java.lang.UnsupportedOperationException >> Serialization trace: >> reserved (org.apache.avro.Schema$Field) >> fieldMap (org.apache.avro.Schema$RecordSchema) >> schema (org.apache.avro.generic.GenericData$Record) >> at >> >> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) >> at >> >> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >> at >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >> at >> >> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) >> at >> >> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >> at >> >> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >> at >> >> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >> at >> >> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >> at >> >> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) >> at >> >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273) >> at >> >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69) >> at >> >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) >> at >> >> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) >> at >> >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) >> at >> >> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) >> at >> >> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) >> at >> >> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) >> at >> >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) >> at >> >> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) >> at >> >> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) >> at >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) >> at >> >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) >> at >> >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) >> at >> >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) >> Caused by: java.lang.UnsupportedOperationException >> at >> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057) >> at >> >> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) >> at >> >> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) >> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) >> at >> >> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) >> ... 26 more >> >> >> Not solving with: >> env.getConfig().disableForceKryo(); >> env.getConfig().enableForceAvro(); >> >> >> Any idea? >> >> Thanks >> >> >> >> >> -- >> Sent from: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > >