Peng created FLINK-14528: ---------------------------- Summary: Add a Constructor for FlinkKafkaProducer Key: FLINK-14528 URL: https://issues.apache.org/jira/browse/FLINK-14528 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.9.0 Reporter: Peng
In flink 1.9.0, defaultTopic param is required for FlinkKafkaProducer constructor. In fact, if I use the below constructor, it is not necessary. Furthermore, it is confused for developer to specify different topic name. {code:java} public FlinkKafkaProducer( String defaultTopic, KafkaSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaProducer.Semantic semantic) {code} For example, set topic name to bar in the constructor. {code:java} input.addSink( new FlinkKafkaProducer<>( "bar", new KafkaSerializationSchemaImpl(), properties, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)); {code} But actually send records to topic baz from KafkaSerializationSchema. {code:java} public class KafkaSerializationSchemaImpl implements KafkaSerializationSchema<KafkaEvent> { @Override public ProducerRecord<byte[], byte[]> serialize(KafkaEvent event, @Nullable Long timestamp) { return new ProducerRecord<>("baz", event.toString().getBytes()); } } {code} So I suggest add a new constructor like below. {code:java} public FlinkKafkaProducer( KafkaSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaProducer.Semantic semantic) {code} It is my humble opinion, please correct me, thanks in advance. -- This message was sent by Atlassian Jira (v8.3.4#803005)