各位好,我想实现自定义partition,继承了 FlinkKafkaPartitioner 后,使用会报错,简化的代码如下。 //自定义partition public class customPartitioner extends FlinkKafkaPartitioner<String> { @Override public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) { return 0; } }
DataStream<String> stream = 。。。 FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>( "test_topic", new SimpleStringSchema(), properties, new customPartitioner() ); stream.addSink(myProducer); //上面的代码,编辑器中编译FlinkKafkaProducer会报错,【Error:(55, 49) java: 无法推断org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<>的类型参数】 //去掉new customPartitioner(),不使用自定义partition,FlinkKafkaProducer就不报错,感觉是构造函数对应不上,但是查看构造函数源码有这个构造函数 查看FlinkKafkaProducer源码如下,我上面的写法有问题么? public FlinkKafkaProducer( String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner) { this( topicId, serializationSchema, producerConfig, customPartitioner.orElse(null), Semantic.AT_LEAST_ONCE, DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); }