[ https://issues.apache.org/jira/browse/FLUME-2814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Li Ye updated FLUME-2814: ------------------------- Attachment: FLUME-2789-2.patch > flume kafka sink does not write events to configured sink topic when source > is also from other topic of kafka > ------------------------------------------------------------------------------------------------------------- > > Key: FLUME-2814 > URL: https://issues.apache.org/jira/browse/FLUME-2814 > Project: Flume > Issue Type: Bug > Components: Sinks+Sources > Affects Versions: v1.5.0 > Reporter: Manohar > Attachments: FLUME-2789-1.patch, FLUME-2789-2.patch > > > I was testing a case when flume agent is reading from kafka source from topic > 'sourcetopic' and sink configured to kafkasink but to other topic > 'destinationtopic', > tier1.sources = source1 > tier1.channels = channel1 > tier1.sinks = sink1 > tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource > tier1.sources.source1.channels = channel1 > tier1.sources.source1.zookeeperConnect = localhost:2181 > tier1.sources.source1.topic = sourcetopic > tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink > tier1.sinks.sink1.topic = destinationtopic > tier1.sinks.sink1.brokerList = localhost:9092 > tier1.sinks.sink1.channel = channel1 > tier1.channels.channel1.type = memory > tier1.channels.channel1.capacity = 10000 > tier1.channels.channel1.transactionCapacity = 1000 > With this settings i noticed that event were not written to > 'destinationtopic', > After debugging the agent if found that kafka source puts in topic name in > header. > headers.put(KafkaSourceConstants.TOPIC, topic); > and in sink check is made to see if headers contain topic, if exists then we > take topic name from header and write event that topic and there by > discarding configured sink topic i ,e destinationtopic. > here is code snippet that does, even though variable topic as > destinationtopic, since header had topic, kafka sink takes topic name from > header and puts event to that topic i,e again to source topic > if ((eventTopic = headers.get(TOPIC_HDR)) == null) { > eventTopic = topic; > } -- This message was sent by Atlassian JIRA (v6.3.4#6332)