Ok, it looks like you've found that solution already based on your question
in [1].

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Proper-way-to-get-DataStream-lt-GenericRecord-gt-td42640.html

On Wed, Mar 31, 2021 at 1:26 PM Matthias Pohl <matth...@ververica.com>
wrote:

> Hi Maminspapin,
> I haven't worked with Kafka/Flink, yet. But have you had a look at the
> docs about the DeserializationSchema [1]? It
> mentions ConfluentRegistryAvroDeserializationSchema. Is this something
> you're looking for?
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#the-deserializationschema
>
> On Tue, Mar 30, 2021 at 6:55 AM Maminspapin <un...@mail.ru> wrote:
>
>> I tried this:
>>
>> 1. Schema (found in stackoverflow)
>>
>> class GenericRecordSchema implements
>> KafkaDeserializationSchema<GenericRecord> {
>>
>>     private String registryUrl;
>>     private transient KafkaAvroDeserializer deserializer;
>>
>>     public GenericRecordSchema(String registryUrl) {
>>         this.registryUrl = registryUrl;
>>     }
>>
>>     @Override
>>     public boolean isEndOfStream(GenericRecord nextElement) {
>>         return false;
>>     }
>>
>>     @Override
>>     public GenericRecord deserialize(ConsumerRecord<byte[], byte[]>
>> consumerRecord) throws Exception {
>>         checkInitialized();
>>         return (GenericRecord)
>> deserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
>>     }
>>
>>     @Override
>>     public TypeInformation<GenericRecord> getProducedType() {
>>         return TypeExtractor.getForClass(GenericRecord.class);
>>     }
>>
>>     private void checkInitialized() {
>>         if (deserializer == null) {
>>             Map<String, Object> props = new HashMap<>();
>>
>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> registryUrl);
>>
>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>>             SchemaRegistryClient client =
>>                     new CachedSchemaRegistryClient(
>>                             registryUrl,
>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>             deserializer = new KafkaAvroDeserializer(client, props);
>>         }
>>     }
>> }
>>
>> 2. Consumer
>>
>> private static FlinkKafkaConsumer<GenericRecord> getConsumer(String
>> topic) {
>>
>>     return new FlinkKafkaConsumer<>(
>>             topic,
>>             new GenericRecordSchema("http://xxx.xx.xxx.xx:8081";),
>>             getConsumerProperties());
>> }
>>
>> But when I start the app, the following error is happen:
>>
>> com.esotericsoftware.kryo.KryoException:
>> java.lang.UnsupportedOperationException
>> Serialization trace:
>> reserved (org.apache.avro.Schema$Field)
>> fieldMap (org.apache.avro.Schema$RecordSchema)
>> schema (org.apache.avro.generic.GenericData$Record)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>         at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>>         at
>>
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:273)
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
>>         at
>>
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
>>         at
>>
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
>>         at
>>
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
>>         at
>>
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
>>         at
>>
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
>>         at
>>
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
>>         at
>>
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>>         at
>>
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>>         at
>>
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>>         at
>>
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>>         at
>>
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
>> Caused by: java.lang.UnsupportedOperationException
>>         at
>> java.util.Collections$UnmodifiableCollection.add(Collections.java:1057)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>         at
>>
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>>         ... 26 more
>>
>>
>> Not solving with:
>> env.getConfig().disableForceKryo();
>> env.getConfig().enableForceAvro();
>>
>>
>> Any idea?
>>
>> Thanks
>>
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>

Reply via email to