各位好,我想实现自定义partition,继承了 FlinkKafkaPartitioner 后,使用会报错,简化的代码如下。
//自定义partition
public class customPartitioner extends FlinkKafkaPartitioner<String> {
    @Override
    public int partition(String record, byte[] key, byte[] value, String 
targetTopic, int[] partitions) {
        return 0;
    }
}


DataStream<String> stream = 。。。
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
                "test_topic",
                new SimpleStringSchema(),
                properties,
                new customPartitioner()
        );
stream.addSink(myProducer);


//上面的代码,编辑器中编译FlinkKafkaProducer会报错,【Error:(55, 49) java: 
无法推断org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<>的类型参数】
//去掉new 
customPartitioner(),不使用自定义partition,FlinkKafkaProducer就不报错,感觉是构造函数对应不上,但是查看构造函数源码有这个构造函数




查看FlinkKafkaProducer源码如下,我上面的写法有问题么?
public FlinkKafkaProducer(
String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig,
Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
this(
topicId,
serializationSchema,
producerConfig,
customPartitioner.orElse(null),
Semantic.AT_LEAST_ONCE,
DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
}



回复