FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new FlinkKafkaConsumer<>(
topic, new CustomKafkaDeserializationSchema(), props ); public class CustomKafkaDeserializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> { @Override public boolean isEndOfStream(ConsumerRecord<String, String> nextElement) { return false; } @Override public ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception { ConsumerRecord<String, String> consumerRecord = new ConsumerRecord<>( record.topic(), record.partition(), record.offset(), new String(record.key(), "UTF-8"), new String(record.value(), "UTF-8") ); return consumerRecord; } @Override public TypeInformation<ConsumerRecord<String, String>> getProducedType() { return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>() { }); } } 在 2021-09-14 10:14:10,"Caizhi Weng" <tsreape...@gmail.com> 写道: >Hi! > >邮件里看不到图片,请检查一下。 > >从标题来看,是不是写了一个自己的 kafka deserialization schema,然后这个类是 abstract class,不能直接 >new? > >赢峰 <si_ji_f...@163.com> 于2021年9月13日周一 下午8:57写道: > >> 报错如下: >> >> >> 代码如下: >> >> >> >> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制 >> >>