[ https://issues.apache.org/jira/browse/FLINK-14528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Peng updated FLINK-14528: ------------------------- Summary: Add a constructor for FlinkKafkaProducer (was: Add a Constructor for FlinkKafkaProducer) > Add a constructor for FlinkKafkaProducer > ---------------------------------------- > > Key: FLINK-14528 > URL: https://issues.apache.org/jira/browse/FLINK-14528 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka > Affects Versions: 1.9.0 > Reporter: Peng > Priority: Minor > > In flink 1.9.0, defaultTopic param is required for FlinkKafkaProducer > constructor. > In fact, if I use the below constructor, it is not necessary. Furthermore, it > is confused for developer to specify different topic name. > {code:java} > public FlinkKafkaProducer( String defaultTopic, > KafkaSerializationSchema<IN> serializationSchema, > Properties producerConfig, > FlinkKafkaProducer.Semantic semantic) > {code} > For example, set topic name to bar in the constructor. > {code:java} > input.addSink( new FlinkKafkaProducer<>( > "bar", > new KafkaSerializationSchemaImpl(), > properties, > > FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)); > {code} > But actually send records to topic baz from KafkaSerializationSchema. > {code:java} > public class KafkaSerializationSchemaImpl implements > KafkaSerializationSchema<KafkaEvent> > { > @Override > public ProducerRecord<byte[], byte[]> serialize(KafkaEvent event, > @Nullable Long timestamp) { > return new ProducerRecord<>("baz", event.toString().getBytes()); > } > } > {code} > So I suggest add a new constructor like below. > {code:java} > public FlinkKafkaProducer( KafkaSerializationSchema<IN> serializationSchema, > Properties producerConfig, > FlinkKafkaProducer.Semantic semantic) > {code} > It is my humble opinion, please correct me, thanks in advance. > -- This message was sent by Atlassian Jira (v8.3.4#803005)