Hi!

    Optional.of(new customPartitioner())



Ye Chen wrote
> 各位好,我想实现自定义partition,继承了 FlinkKafkaPartitioner 后,使用会报错,简化的代码如下。
> //自定义partition
> public class customPartitioner extends FlinkKafkaPartitioner
> <String>
>  {
>     @Override
>     public int partition(String record, byte[] key, byte[] value, String
> targetTopic, int[] partitions) {
>         return 0;
>     }
> }
> 
> 
> DataStream
> <String>
>  stream = 。。。
> FlinkKafkaProducer
> <String>
>  myProducer = new FlinkKafkaProducer<>(
>                 "test_topic",
>                 new SimpleStringSchema(),
>                 properties,
>                 new customPartitioner()
>         );
> stream.addSink(myProducer);
> 
> 
> //上面的代码,编辑器中编译FlinkKafkaProducer会报错,【Error:(55, 49) java:
> 无法推断org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<>的类型参数】
> //去掉new
> customPartitioner(),不使用自定义partition,FlinkKafkaProducer就不报错,感觉是构造函数对应不上,但是查看构造函数源码有这个构造函数
> 
> 
> 
> 
> 查看FlinkKafkaProducer源码如下,我上面的写法有问题么?
> public FlinkKafkaProducer(
> String topicId,
> SerializationSchema
> <IN>
>  serializationSchema,
> Properties producerConfig,
> Optional&lt;FlinkKafkaPartitioner&lt;IN&gt;> customPartitioner) {
> this(
> topicId,
> serializationSchema,
> producerConfig,
> customPartitioner.orElse(null),
> Semantic.AT_LEAST_ONCE,
> DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
> }





--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复