Yes, I use a Static Interceptor to solve that: agent_myAgent.sources.kafkaSource.interceptors = i1 agent_myAgent.sources.kafkaSource.interceptors.i1.type = static agent_myAgent.sources.kafkaSource.interceptors.i1.key = topic agent_myAgent.sources.kafkaSource.interceptors.i1.preserveExisting = false agent_myAgent.sources.kafkaSource.interceptors.i1.value = sinkTopic
Thanks for your reply. ------------------ ???????? ------------------ ??????: "Gonzalo Herreros";<[email protected]>; ????????: 2016??6??12??(??????) ????3:53 ??????: "user"<[email protected]>; ????: Re: Kafka Sink Topic was overwritten by Kafka Source Topic You need an interceptor to update/remove the topic header Gonzalo On Jun 12, 2016 4:57 AM, "lxw" <[email protected]> wrote: Hi,All: I use Kafka Source to read events from one Kafka topic and write events to another Topic with Kafka Sink, the Kafka Sink topic configuration is not work, flume still write events to Kafka source Topic (sourceTopic). agent_myAgent.sources.kafkaSource.topic = sourceTopic agent_myAgent.sinks.kafkaSink.topic = sinkTopic SourceCode in "org.apache.flume.source.kafka.KafkaSource.process()" : // Add headers to event (topic, timestamp, and key) headers = new HashMap<String, String>(); headers.put(KafkaSourceConstants.TIMESTAMP, String.valueOf(System.currentTimeMillis())); headers.put(KafkaSourceConstants.TOPIC, topic); and in Kafka Sink properties: The topic in Kafka to which the messages will be published. If this parameter is configured, messages will be published to this topic. If the event header contains a ??topic?? field, the event will be published to that topic overriding the topic configured here. SourceCode in "org.apache.flume.sink.kafka.KafkaSink.process()": if ((eventTopic = headers.get(TOPIC_HDR)) == null) { eventTopic = topic; } In my case, how I can fix this problem? Flume version is 1.6.0. Thanks!
