[ https://issues.apache.org/jira/browse/FLUME-2814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Li Ye updated FLUME-2814: ------------------------- Attachment: FLUME-2789-1.patch This issue is similar to Flume-2789. An optinal property called ignoreTopicInHeader is added for Kafka Sink. Its default value is false, so it is compatible with Flume 1.6.0. If you want to ignore topic in header and write events to the topic you specified in properties file, you can set ignoreTopicInHeader to true. Besides, three optinal properties topicHeader, keyHeader, timestampHeader are added for Kafka Sink. They are similar to fileHeader and basenameHeader for Spooling Directory Source. Their default value are true, so they are compatible with Flume 1.6.0. If you do not want to add headers storing topic, key or timestamp, you can set them to false. > flume kafka sink does not write events to configured sink topic when source > is also from other topic of kafka > ------------------------------------------------------------------------------------------------------------- > > Key: FLUME-2814 > URL: https://issues.apache.org/jira/browse/FLUME-2814 > Project: Flume > Issue Type: Bug > Components: Sinks+Sources > Affects Versions: v1.5.0 > Reporter: Manohar > Attachments: FLUME-2789-1.patch > > > I was testing a case when flume agent is reading from kafka source from topic > 'sourcetopic' and sink configured to kafkasink but to other topic > 'destinationtopic', > tier1.sources = source1 > tier1.channels = channel1 > tier1.sinks = sink1 > tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource > tier1.sources.source1.channels = channel1 > tier1.sources.source1.zookeeperConnect = localhost:2181 > tier1.sources.source1.topic = sourcetopic > tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink > tier1.sinks.sink1.topic = destinationtopic > tier1.sinks.sink1.brokerList = localhost:9092 > tier1.sinks.sink1.channel = channel1 > tier1.channels.channel1.type = memory > tier1.channels.channel1.capacity = 10000 > tier1.channels.channel1.transactionCapacity = 1000 > With this settings i noticed that event were not written to > 'destinationtopic', > After debugging the agent if found that kafka source puts in topic name in > header. > headers.put(KafkaSourceConstants.TOPIC, topic); > and in sink check is made to see if headers contain topic, if exists then we > take topic name from header and write event that topic and there by > discarding configured sink topic i ,e destinationtopic. > here is code snippet that does, even though variable topic as > destinationtopic, since header had topic, kafka sink takes topic name from > header and puts event to that topic i,e again to source topic > if ((eventTopic = headers.get(TOPIC_HDR)) == null) { > eventTopic = topic; > } -- This message was sent by Atlassian JIRA (v6.3.4#6332)