Hi! Optional.of(new customPartitioner())
Ye Chen wrote > 各位好,我想实现自定义partition,继承了 FlinkKafkaPartitioner 后,使用会报错,简化的代码如下。 > //自定义partition > public class customPartitioner extends FlinkKafkaPartitioner > <String> > { > @Override > public int partition(String record, byte[] key, byte[] value, String > targetTopic, int[] partitions) { > return 0; > } > } > > > DataStream > <String> > stream = 。。。 > FlinkKafkaProducer > <String> > myProducer = new FlinkKafkaProducer<>( > "test_topic", > new SimpleStringSchema(), > properties, > new customPartitioner() > ); > stream.addSink(myProducer); > > > //上面的代码,编辑器中编译FlinkKafkaProducer会报错,【Error:(55, 49) java: > 无法推断org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<>的类型参数】 > //去掉new > customPartitioner(),不使用自定义partition,FlinkKafkaProducer就不报错,感觉是构造函数对应不上,但是查看构造函数源码有这个构造函数 > > > > > 查看FlinkKafkaProducer源码如下,我上面的写法有问题么? > public FlinkKafkaProducer( > String topicId, > SerializationSchema > <IN> > serializationSchema, > Properties producerConfig, > Optional<FlinkKafkaPartitioner<IN>> customPartitioner) { > this( > topicId, > serializationSchema, > producerConfig, > customPartitioner.orElse(null), > Semantic.AT_LEAST_ONCE, > DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); > } -- Sent from: http://apache-flink.147419.n8.nabble.com/