Hi,

It would be helpful if you could provide full stack trace, what Flink version 
and which Kafka connector version are you using?

It sounds like either a dependency convergence error (mixing Kafka 
dependencies/various versions of flink-connector-kafka inside a single job/jar) 
or some shading issue. Can you check your project for such issues (`mvn 
dependency:tree` command [1]).

Also what’s a bit suspicious for me is the return type:

> Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;

I’m not sure, but I was not aware that we are shading Kafka dependency in our 
connectors? Are you manually shading something?

Piotrek

[1] 
https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree.html
 
<https://maven.apache.org/plugins/maven-dependency-plugin/examples/resolving-conflicts-using-the-dependency-tree.html>

> On 22 May 2020, at 15:34, wangl...@geekplus.com.cn wrote:
> 
> 
> public class MyKafkaSerializationSchema implements 
> KafkaSerializationSchema<Tuple2<String, String>> {
>     @Override
>     public ProducerRecord<byte[], byte[]> serialize(Tuple2<String, String> o, 
> @Nullable Long aLong) {
>         ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(o.f0,
>             o.f1.getBytes(StandardCharsets.UTF_8));
>         return record;
>     }
> }
> FlinkKafkaProducer<Tuple2<String, String>> producer = new 
> FlinkKafkaProducer<Tuple2<String, String>>(
>     "default", new MyKafkaSerializationSchema(),
>     prop2,Semantic.EXACTLY_ONCE);
> 
> But there's  error when runnng:
> 
> java.lang.AbstractMethodError: 
> com.geekplus.flinketl.schema.MyKafkaSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;
> 
> Any suggestion on this?
> 
> Thanks,
> Lei
> wangl...@geekplus.com.cn <mailto:wangl...@geekplus.com.cn>

Reply via email to