[ 
https://issues.apache.org/jira/browse/FLUME-2814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Li Ye updated FLUME-2814:
-------------------------
    Attachment: FLUME-2789-2.patch

> flume kafka sink does not write events to configured sink topic when source 
> is also from other topic of kafka
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: FLUME-2814
>                 URL: https://issues.apache.org/jira/browse/FLUME-2814
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.5.0
>            Reporter: Manohar
>         Attachments: FLUME-2789-1.patch, FLUME-2789-2.patch
>
>
> I was testing a case when flume agent is reading from kafka source from topic 
> 'sourcetopic' and sink configured to kafkasink but to other topic 
> 'destinationtopic', 
> tier1.sources  = source1
> tier1.channels = channel1
> tier1.sinks    = sink1
> tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
> tier1.sources.source1.channels = channel1
> tier1.sources.source1.zookeeperConnect = localhost:2181
> tier1.sources.source1.topic = sourcetopic
> tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
> tier1.sinks.sink1.topic = destinationtopic
> tier1.sinks.sink1.brokerList = localhost:9092
> tier1.sinks.sink1.channel = channel1
> tier1.channels.channel1.type = memory
> tier1.channels.channel1.capacity = 10000
> tier1.channels.channel1.transactionCapacity = 1000
> With this settings i noticed that event were not written to 
> 'destinationtopic', 
> After debugging the agent if found that kafka source puts in topic name in 
> header. 
> headers.put(KafkaSourceConstants.TOPIC, topic);
> and in sink check is made to see if headers contain topic, if exists then we 
> take topic name from header and write event that topic and there by 
> discarding configured sink topic i ,e destinationtopic.
> here is code snippet that does, even though variable topic as 
> destinationtopic, since header had topic, kafka sink takes topic name from 
> header and puts event to that topic i,e again to source topic
>         if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
>           eventTopic = topic;
>         }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to