Peng created FLINK-14528:
----------------------------

             Summary: Add a Constructor for FlinkKafkaProducer
                 Key: FLINK-14528
                 URL: https://issues.apache.org/jira/browse/FLINK-14528
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Kafka
    Affects Versions: 1.9.0
            Reporter: Peng


In flink 1.9.0, defaultTopic param is required for FlinkKafkaProducer 
constructor.

In fact, if I use the below constructor, it is not necessary. Furthermore, it 
is confused for developer to specify different topic name.
{code:java}
public FlinkKafkaProducer( String defaultTopic, 
                           KafkaSerializationSchema<IN> serializationSchema, 
                           Properties producerConfig, 
                           FlinkKafkaProducer.Semantic semantic)
{code}
For example, set topic name to bar in the constructor.
{code:java}
input.addSink( new FlinkKafkaProducer<>( 
                                     "bar", 
                                     new KafkaSerializationSchemaImpl(), 
                                     properties, 
                                     
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE));
{code}
But actually send records to topic baz from KafkaSerializationSchema. 
{code:java}
public class KafkaSerializationSchemaImpl implements 
KafkaSerializationSchema<KafkaEvent> 
{    
    @Override    
    public ProducerRecord<byte[], byte[]> serialize(KafkaEvent event, @Nullable 
Long timestamp) {
        return new ProducerRecord<>("baz", event.toString().getBytes());
    }
}
{code}
So I suggest add a new constructor like below.
{code:java}
public FlinkKafkaProducer( KafkaSerializationSchema<IN> serializationSchema,
                           Properties producerConfig, 
                           FlinkKafkaProducer.Semantic semantic)
{code}
It is my humble opinion, please correct me, thanks in advance.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to