Hi,
Here’s an example:
DataStream inputStream = …;
inputStream.addSink(new FlinkKafkaProducer09<>(
“defaultTopic”, new CustomKeyedSerializationSchema(), props));
Code for CustomKeyedSerializationSchema:
public class CustomKeyedSerializationSchema implements
KeyedDeserializationSchema {
Thanks,I'm not sure I understand this, what I need is to have single a process
subscribing multiple kafka topics, and have a switch clause for different
topics in my SinkFunction, did you I need to change the way how the kafka
producer to produce the message? Any pointer to code samples will be
Hi Richard,
Producing to multiple topics is treated a bit differently in the Flink Kafka
producer.
You need to set a single default target topic, and in
`KeyedSerializationSchema#getTargetTopic()` you can override the default topic
with whatever is returned. The `getTargetTopic` method is
when using FlinkKafkaConsumer010 to subscribing multiple topics as
List topics = Arrays.asList("test1","test2");
DataStream stream = env.addSource(new FlinkKafkaConsumer010<>(topics,
new SimpleStringSchema(), properties));
How do I get topic names in my SinkFunction? i.e. stream.addSink()