[ 
https://issues.apache.org/jira/browse/FLINK-14528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peng updated FLINK-14528:
-------------------------
    Summary: Add a constructor for FlinkKafkaProducer  (was: Add a Constructor 
for FlinkKafkaProducer)

> Add a constructor for FlinkKafkaProducer
> ----------------------------------------
>
>                 Key: FLINK-14528
>                 URL: https://issues.apache.org/jira/browse/FLINK-14528
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.9.0
>            Reporter: Peng
>            Priority: Minor
>
> In flink 1.9.0, defaultTopic param is required for FlinkKafkaProducer 
> constructor.
> In fact, if I use the below constructor, it is not necessary. Furthermore, it 
> is confused for developer to specify different topic name.
> {code:java}
> public FlinkKafkaProducer( String defaultTopic, 
>                            KafkaSerializationSchema<IN> serializationSchema, 
>                            Properties producerConfig, 
>                            FlinkKafkaProducer.Semantic semantic)
> {code}
> For example, set topic name to bar in the constructor.
> {code:java}
> input.addSink( new FlinkKafkaProducer<>( 
>                                      "bar", 
>                                      new KafkaSerializationSchemaImpl(), 
>                                      properties, 
>                                      
> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE));
> {code}
> But actually send records to topic baz from KafkaSerializationSchema. 
> {code:java}
> public class KafkaSerializationSchemaImpl implements 
> KafkaSerializationSchema<KafkaEvent> 
> {    
>     @Override    
>     public ProducerRecord<byte[], byte[]> serialize(KafkaEvent event, 
> @Nullable Long timestamp) {
>         return new ProducerRecord<>("baz", event.toString().getBytes());
>     }
> }
> {code}
> So I suggest add a new constructor like below.
> {code:java}
> public FlinkKafkaProducer( KafkaSerializationSchema<IN> serializationSchema,
>                            Properties producerConfig, 
>                            FlinkKafkaProducer.Semantic semantic)
> {code}
> It is my humble opinion, please correct me, thanks in advance.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to