We have a two stage topology in flume in which we are in the first tier adding headers based on hash value of a field in the event. The hashing logic is added in the interceptor in Tier 1 of flume topology which basically sets a header field. And then we use multiplexing to direct events to Tier 2 based on that header field through selector. In the second tier we are storing the events locally using file_roll and storing the same events in hdfs also.
Everything works fine when we are not using the failover sinks. When we add the failover sink configuration in the first tier our hashing logic gets overriden. That means even when all the machines in our Tier 2 are active and running, some events which were meant for flume agent1(based on hashing & multiplexing) go to agent 2. Also we are performing this test on three machines. One machine for Tier 1( lets say machine A) and two machines(lets say machine B & C) for Tier 2. In Tier 2 for flume agent on machine B, the machine C acts as the failover backup and for flume agent on machine C, the machine B acts as the failover backup. Any idea what could be wrong with this configuration? Below are the tier wise configurations: *Tier 1:* agent1tier1.sources = tcpsrc agent1tier1.sinks = avro-forward-ch01 avro-forward-ch01backup avro-forward-ch02 avro-forward-ch02backup agent1tier1.channels = channelbucket01 channelbucket02 agent1tier1.channels.channelbucket01.type = file agent1tier1.channels.channelbucket02.type = file agent1tier1.sources.tcpsrc.channels = channelbucket01 channelbucket02 agent1tier1.sources.tcpsrc.type = syslogtcp agent1tier1.sources.tcpsrc.port = 5149 agent1tier1.sources.tcpsrc.host = xx.xxx.x.104 agent1tier1.sources.tcpsrc.interceptors=i1 agent1tier1.sources.tcpsrc.interceptors.i1.type=timestamp #################### INTERCEPTOR ############################## agent1tier1.sources.tcpsrc.interceptors=logsintercept agent1tier1.sources.tcpsrc.interceptors.logsintercept.type=com.custom.flume.interceptor.eventTweaker$Builder agent1tier1.sources.tcpsrc.interceptors.logsintercept.hashbuckets = bucket01,bucket02 #################### END OF INTERCEPTOR ############################## ####################### SELECTOR ########################### agent1tier1.sources.tcpsrc.selector.type=multiplexing agent1tier1.sources.tcpsrc.selector.header = bucket agent1tier1.sources.tcpsrc.selector.mapping.bucket01 = channelbucket01 agent1tier1.sources.tcpsrc.selector.mapping.bucket02 = channelbucket02 agent1tier1.sources.tcpsrc.selector.default = channelbucket01 ##################### END OF SELECTOR ############################# #################### CHANNELS ############################## agent1tier1.channels.channelbucket01.checkpointDir = /home/flume/channelbucket01/file-channel/checkpoint agent1tier1.channels.channelbucket01.dataDirs = /home/flume/channelbucket01/file-channel/1/data,/home/flume/channelbucket01/file-channel/2/data,/home/flume/channelbucket01/file-channel/3/data agent1tier1.channels.channelbucket02.checkpointDir = /home/flume/channelbucket02/file-channel/checkpoint agent1tier1.channels.channelbucket02.dataDirs = /home/flume/channelbucket02/file-channel/1/data,/home/flume/channelbucket02/file-channel/2/data,/home/flume/channelbucket02/file-channel/3/data #################### CHANNELS ############################## ################## CHANNELS CAPACITY ############################ agent1tier1.channels.channelbucket01.transactionCapacity = 1000000 agent1tier1.channels.channelbucket01.checkpointInterval = 30000 agent1tier1.channels.channelbucket01.maxFileSize = 2146435071 agent1tier1.channels.channelbucket01.capacity = 10000000 agent1tier1.channels.channelbucket02.transactionCapacity = 1000000 agent1tier1.channels.channelbucket02.checkpointInterval = 30000 agent1tier1.channels.channelbucket02.maxFileSize = 2146435071 agent1tier1.channels.channelbucket02.capacity = 10000000 ################## END OF CHANNELS CAPACITY ############################ # avro sink properties agent1tier1.sinks.avro-forward-ch01.channel = channelbucket01 agent1tier1.sinks.avro-forward-ch01.type = avro agent1tier1.sinks.avro-forward-ch01.hostname = xx.xxx.x.106 agent1tier1.sinks.avro-forward-ch01.port = 10000 # avro sink properties agent1tier1.sinks.avro-forward-ch01backup.channel = channelbucket01 agent1tier1.sinks.avro-forward-ch01backup.type = avro agent1tier1.sinks.avro-forward-ch01backup.hostname = xx.xxx.x.29 agent1tier1.sinks.avro-forward-ch01backup.port = 19999 # avro sink properties agent1tier1.sinks.avro-forward-ch02.channel = channelbucket02 agent1tier1.sinks.avro-forward-ch02.type = avro agent1tier1.sinks.avro-forward-ch02.hostname = xx.xxx.x.29 agent1tier1.sinks.avro-forward-ch02.port = 19999 # avro sink properties agent1tier1.sinks.avro-forward-ch02backup.channel = channelbucket02 agent1tier1.sinks.avro-forward-ch02backup.type = avro agent1tier1.sinks.avro-forward-ch02backup.hostname = xx.xxx.x.106 agent1tier1.sinks.avro-forward-ch02backup.port = 10000 agent1tier1.sinkgroups = grpch1 agent1tier1.sinkgroups.grpch1.sinks = avro-forward-ch01 avro-forward-ch01backup agent1tier1.sinkgroups.grpch1.processor.type = failover #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01 = 0 #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01backup = 10 agent1tier1.sinkgroups.grpch1.processor.maxpenalty = 10000 agent1tier1.sinkgroups = grpch2 agent1tier1.sinkgroups.grpch2.sinks = avro-forward-ch02 avro-forward-ch02backup agent1tier1.sinkgroups.grpch2.processor.type = failover #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02 = 1 #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02backup = 11 agent1tier1.sinkgroups.grpch2.processor.maxpenalty = 10000 *Tier 2:* tier2.sources = avro-AppSrv-source tier2.sinks = impsink convsink clksink rtsink hdfssinkrt hdfssinkdel tier2.channels = channelconv channelimp channelclk channelrt channelhdfsrt channelhdfsdel tier2.channels.channelimp.type = file tier2.channels.channelconv.type = file tier2.channels.channelclk.type = file tier2.channels.channelrt.type = file tier2.channels.channelhdfsrt.type = file tier2.channels.channelhdfsdel.type = file # For each source, channel, and sink, set # standard properties. # properties of avro-AppSrv-source tier2.sources.avro-AppSrv-source.channels = channelconv channelimp channelclk channelrt channelhdfsrt channelhdfsdel tier2.sources.avro-AppSrv-source.type = avro tier2.sources.avro-AppSrv-source.bind = xx.xxx.x.106 tier2.sources.avro-AppSrv-source.port = 10000 tier2.sources.avro-AppSrv-source.selector.type=multiplexing tier2.sources.avro-AppSrv-source.selector.header = rectype tier2.sources.avro-AppSrv-source.selector.mapping.IMP= channelimp channelhdfsdel tier2.sources.avro-AppSrv-source.selector.mapping.CLK = channelclk channelhdfsdel tier2.sources.avro-AppSrv-source.selector.mapping.CONV = channelconv channelhdfsdel tier2.sources.avro-AppSrv-source.selector.mapping.RT= channelrt channelhdfsrt tier2.sources.avro-AppSrv-source.selector.default = channelhdfsdel tier2.sinks.impsink.type = file_roll tier2.sinks.impsink.channel = channelimp tier2.sinks.impsink.sink.directory = /var/log/flume/imp tier2.sinks.impsink.sink.rollInterval=60 tier2.sinks.convsink.type = file_roll tier2.sinks.convsink.channel = channelconv tier2.sinks.convsink.sink.directory = /var/log/flume/conv tier2.sinks.convsink.sink.rollInterval=60 tier2.sinks.clksink.type = file_roll tier2.sinks.clksink.channel = channelclk tier2.sinks.clksink.sink.directory = /var/log/flume/clk tier2.sinks.clksink.sink.rollInterval=60 tier2.sinks.rtsink.type = file_roll tier2.sinks.rtsink.channel = channelrt tier2.sinks.rtsink.sink.directory = /var/log/flume/rt tier2.sinks.rtsink.sink.rollInterval=60 #################### CHANNELS ############################## tier2.channels.channelimp.checkpointDir = /home/flume/channelimp/file-channel/checkpoint tier2.channels.channelimp.dataDirs = /home/flume/channelimp/file-channel/1/data,/home/flume/channelimp/file-channel/2/data,/home/flume/channelimp/file-channel/3/data tier2.channels.channelclk.checkpointDir = /home/flume/channelclk/file-channel/checkpoint tier2.channels.channelclk.dataDirs = /home/flume/channelclk/file-channel/1/data,/home/flume/channelclk/file-channel/2/data,/home/flume/channelclk/file-channel/3/data tier2.channels.channelconv.checkpointDir = /home/flume/channelconv/file-channel/checkpoint tier2.channels.channelconv.dataDirs = /home/flume/channelconv/file-channel/1/data,/home/flume/channelconv/file-channel/2/data,/home/flume/channelconv/file-channel/3/data tier2.channels.channelrt.checkpointDir = /home/flume/channelrt/file-channel/checkpoint tier2.channels.channelrt.dataDirs = /home/flume/channelrt/file-channel/1/data,/home/flume/channelrt/file-channel/2/data,/home/flume/channelrt/file-channel/3/data tier2.channels.channelhdfsrt.checkpointDir = /home/flume/channelhdfsrt/file-channel/checkpoint tier2.channels.channelhdfsrt.dataDirs = /home/flume/channelhdfsrt/file-channel/1/data,/home/flume/channelhdfsrt/file-channel/2/data,/home/flume/channelhdfsrt/file-channel/3/data tier2.channels.channelhdfsdel.checkpointDir = /home/flume/channelhdfsdel/file-channel/checkpoint tier2.channels.channelhdfsdel.dataDirs = /home/flume/channelhdfsdel/file-channel/1/data,/home/flume/channelhdfsdel/file-channel/2/data,/home/flume/channelhdfsdel/file-channel/3/data #################### CHANNELS ############################## tier2.sinks.hdfssinkrt.type = hdfs tier2.sinks.hdfssinkrt.channel = channelhdfsrt tier2.sinks.hdfssinkrt.hdfs.path = hdfs://xx.xxx.x.102/user/dataplat/adslogs/rt/year=%Y/month=%m/day=%d/hour=%H tier2.sinks.hdfssinkrt.hdfs.codeC = gzip tier2.sinks.hdfssinkrt.hdfs.fileType = CompressedStream tier2.sinks.hdfssinkrt.hdfs.filePrefix = FlumeSinkhdfsrt # Roll based on the block size only tier2.sinks.hdfssinkrt.hdfs.rollCount= 200000 tier2.sinks.hdfssinkrt.hdfs.rollInterval=120 tier2.sinks.hdfssinkrt.hdfs.rollSize = 0 # seconds to wait before closing the file. #tier2.sinks.hdfssinkrt.hdfs.idleTimeout = 60 tier2.sinks.hdfssinkrt.hdfs.batchSize=20000 tier2.sinks.hdfssinkrt.hdfs.txnEventMax=20000 #tier2.sinks.hdfssinkrt.hdfs.threadsPoolSize=20 tier2.sinks.hdfssinkrt.hdfs.useLocalTimeStamp = false tier2.sinks.hdfssinkrt.hdfs.callTimeout = 30000 tier2.sinks.hdfssinkdel.type = hdfs tier2.sinks.hdfssinkdel.channel = channelhdfsdel tier2.sinks.hdfssinkdel.hdfs.path = hdfs://xx.xxx.x.102/user/dataplat/adslogs/del/year=%Y/month=%m/day=%d/hour=%H tier2.sinks.hdfssinkdel.hdfs.codeC = gzip tier2.sinks.hdfssinkdel.hdfs.fileType = CompressedStream tier2.sinks.hdfssinkdel.hdfs.filePrefix = FlumeSinkhdfsdel # Roll based on the block size only tier2.sinks.hdfssinkdel.hdfs.rollCount= 200000 tier2.sinks.hdfssinkdel.hdfs.rollInterval=120 tier2.sinks.hdfssinkdel.hdfs.rollSize = 0 # seconds to wait before closing the file. #tier2.sinks.hdfssinkdel.hdfs.idleTimeout = 60 tier2.sinks.hdfssinkdel.hdfs.batchSize=20000 tier2.sinks.hdfssinkdel.hdfs.txnEventMax=20000 #tier2.sinks.hdfssinkdel.hdfs.threadsPoolSize=20 tier2.sinks.hdfssinkdel.hdfs.useLocalTimeStamp = false tier2.sinks.hdfssinkdel.hdfs.callTimeout = 30000 #################### END OF SINKS ##############################
