FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new 
FlinkKafkaConsumer<>(

        topic,

        new CustomKafkaDeserializationSchema(),

        props

);







public class CustomKafkaDeserializationSchema implements 
KafkaDeserializationSchema<ConsumerRecord<String, String>> {

    @Override

    public boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {

        return false;

    }

    @Override

    public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], 
byte[]> record) throws Exception {

        ConsumerRecord<String, String> consumerRecord = new ConsumerRecord<>(

                record.topic(),

                record.partition(),

                record.offset(),

                new String(record.key(), "UTF-8"),

                new String(record.value(), "UTF-8")

        );

        return consumerRecord;

    }

    @Override

    public TypeInformation<ConsumerRecord<String, String>> getProducedType() {

        return TypeInformation.of(new TypeHint<ConsumerRecord<String, 
String>>() {

        });

    }

}

















在 2021-09-14 10:14:10,"Caizhi Weng" <tsreape...@gmail.com> 写道:
>Hi!
>
>邮件里看不到图片,请检查一下。
>
>从标题来看,是不是写了一个自己的 kafka deserialization schema,然后这个类是 abstract class,不能直接
>new?
>
>赢峰 <si_ji_f...@163.com> 于2021年9月13日周一 下午8:57写道:
>
>> 报错如下:
>>
>>
>> 代码如下:
>>
>>
>>
>> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制
>>
>>

回复