Re: how to get topic names in SinkFunction when using FlinkKafkaConsumer010 with multiple topics

2017-07-16 Thread Tzu-Li (Gordon) Tai
Hi, Here’s an example: DataStream inputStream = …; inputStream.addSink(new FlinkKafkaProducer09<>(     “defaultTopic”, new CustomKeyedSerializationSchema(), props)); Code for CustomKeyedSerializationSchema: public class CustomKeyedSerializationSchema implements KeyedDeserializationSchema {    

Re: how to get topic names in SinkFunction when using FlinkKafkaConsumer010 with multiple topics

2017-07-06 Thread Richard Xin
Thanks,I'm not sure I understand this, what I need is to have single a process subscribing multiple kafka topics, and have a switch clause for different topics in my SinkFunction, did you I need to change the way how the kafka producer to produce the message? Any pointer to code samples will be

Re: how to get topic names in SinkFunction when using FlinkKafkaConsumer010 with multiple topics

2017-07-05 Thread Tzu-Li (Gordon) Tai
Hi Richard, Producing to multiple topics is treated a bit differently in the Flink Kafka producer. You need to set a single default target topic, and in `KeyedSerializationSchema#getTargetTopic()` you can override the default topic with whatever is returned. The `getTargetTopic` method is

how to get topic names in SinkFunction when using FlinkKafkaConsumer010 with multiple topics

2017-07-05 Thread Richard Xin
when using FlinkKafkaConsumer010 to subscribing multiple topics as  List topics = Arrays.asList("test1","test2"); DataStream stream = env.addSource(new FlinkKafkaConsumer010<>(topics,  new SimpleStringSchema(), properties)); How do I get topic names in my SinkFunction? i.e. stream.addSink()