Hi Hari, This is our latest config:
agent1tier1.sources = tcpsrc agent1tier1.sinks = avro-forward-ch01 avro-forward-ch01backup avro-forward-ch02 avro-forward-ch02backup agent1tier1.channels = channelbucket01 channelbucket02 agent1tier1.channels.channelbucket01.type = file agent1tier1.channels.channelbucket02.type = file agent1tier1.sources.tcpsrc.channels = channelbucket01 channelbucket02 agent1tier1.sources.tcpsrc.type = syslogtcp agent1tier1.sources.tcpsrc.port = 5149 agent1tier1.sources.tcpsrc.host = xx.xxx.x.104 agent1tier1.sources.tcpsrc.interceptors=i1 agent1tier1.sources.tcpsrc.interceptors.i1.type=timestamp #################### INTERCEPTOR ############################## agent1tier1.sources.tcpsrc.interceptors=logsintercept agent1tier1.sources.tcpsrc.interceptors.logsintercept.type=com.custom.flume.interceptor.eventTweaker$Builder agent1tier1.sources.tcpsrc.interceptors.logsintercept.hashbuckets = bucket01,bucket02 #################### END OF INTERCEPTOR ############################## ####################### SELECTOR ########################### agent1tier1.sources.tcpsrc.selector.type=multiplexing agent1tier1.sources.tcpsrc.selector.header = bucket agent1tier1.sources.tcpsrc.selector.mapping.bucket01 = channelbucket01 agent1tier1.sources.tcpsrc.selector.mapping.bucket02 = channelbucket02 agent1tier1.sources.tcpsrc.selector.default = channelbucket01 ##################### END OF SELECTOR ############################# #################### CHANNELS ############################## agent1tier1.channels.channelbucket01.checkpointDir = /home/flume/channelbucket01/file-channel/checkpoint agent1tier1.channels.channelbucket01.dataDirs = /home/flume/channelbucket01/file-channel/1/data,/home/flume/channelbucket01/file-channel/2/data,/home/flume/channelbucket01/file-channel/3/data agent1tier1.channels.channelbucket02.checkpointDir = /home/flume/channelbucket02/file-channel/checkpoint agent1tier1.channels.channelbucket02.dataDirs = /home/flume/channelbucket02/file-channel/1/data,/home/flume/channelbucket02/file-channel/2/data,/home/flume/channelbucket02/file-channel/3/data #################### CHANNELS ############################## ################## CHANNELS CAPACITY ############################ agent1tier1.channels.channelbucket01.transactionCapacity = 1000000 agent1tier1.channels.channelbucket01.checkpointInterval = 30000 agent1tier1.channels.channelbucket01.maxFileSize = 2146435071 agent1tier1.channels.channelbucket01.capacity = 10000000 agent1tier1.channels.channelbucket02.transactionCapacity = 1000000 agent1tier1.channels.channelbucket02.checkpointInterval = 30000 agent1tier1.channels.channelbucket02.maxFileSize = 2146435071 agent1tier1.channels.channelbucket02.capacity = 10000000 ################## END OF CHANNELS CAPACITY ############################ # avro sink properties agent1tier1.sinks.avro-forward-ch01.channel = channelbucket01 agent1tier1.sinks.avro-forward-ch01.type = avro agent1tier1.sinks.avro-forward-ch01.hostname = xx.xxx.x.106 agent1tier1.sinks.avro-forward-ch01.port = 10000 # avro sink properties agent1tier1.sinks.avro-forward-ch01backup.channel = channelbucket01 agent1tier1.sinks.avro-forward-ch01backup.type = avro agent1tier1.sinks.avro-forward-ch01backup.hostname = xx.xxx.x.29 agent1tier1.sinks.avro-forward-ch01backup.port = 19999 # avro sink properties agent1tier1.sinks.avro-forward-ch02.channel = channelbucket02 agent1tier1.sinks.avro-forward-ch02.type = avro agent1tier1.sinks.avro-forward-ch02.hostname = xx.xxx.x.29 agent1tier1.sinks.avro-forward-ch02.port = 19999 # avro sink properties agent1tier1.sinks.avro-forward-ch02backup.channel = channelbucket02 agent1tier1.sinks.avro-forward-ch02backup.type = avro agent1tier1.sinks.avro-forward-ch02backup.hostname = xx.xxx.x.106 agent1tier1.sinks.avro-forward-ch02backup.port = 10000 agent1tier1.sinkgroups = grpch1 agent1tier1.sinkgroups.grpch1.sinks = avro-forward-ch01 avro-forward-ch01backup agent1tier1.sinkgroups.grpch1.processor.type = failover #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01 = 10 #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01backup = 2 agent1tier1.sinkgroups.grpch1.processor.maxpenalty = 10000 agent1tier1.sinkgroups = grpch2 agent1tier1.sinkgroups.grpch2.sinks = avro-forward-ch02 avro-forward-ch02backup agent1tier1.sinkgroups.grpch2.processor.type = failover #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02 = 11 #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02backup = 1 agent1tier1.sinkgroups.grpch2.processor.maxpenalty = 10000 Regards Mohit On Thu, Sep 18, 2014 at 10:38 AM, Hari Shreedharan < [email protected]> wrote: > Can you send your latest config? > > Thanks, > Hari > > > On Tue, Sep 16, 2014 at 6:01 AM, Mohit Durgapal <[email protected]> > wrote: > >> We have a two stage topology in flume in which we are in the first tier >> adding headers based on hash value of a field in the event. >> The hashing logic is added in the interceptor in Tier 1 of flume topology >> which basically sets a header field. And then we use multiplexing to direct >> events to Tier 2 based on that header field through selector. >> In the second tier we are storing the events locally using file_roll and >> storing the same events in hdfs also. >> >> Everything works fine when we are not using the failover sinks. When we >> add the failover sink configuration in the first tier our hashing logic >> gets overriden. That means even when all the machines in our Tier 2 are >> active and running, some events which were meant for flume agent1(based on >> hashing & multiplexing) go to agent 2. >> >> Also we are performing this test on three machines. One machine for Tier >> 1( lets say machine A) and two machines(lets say machine B & C) for Tier 2. >> In Tier 2 for flume agent on machine B, the machine C acts as the failover >> backup and for flume agent on machine C, the machine B acts as the failover >> backup. >> >> Any idea what could be wrong with this configuration? >> >> Below are the tier wise configurations: >> >> *Tier 1:* >> >> agent1tier1.sources = tcpsrc >> agent1tier1.sinks = avro-forward-ch01 avro-forward-ch01backup >> avro-forward-ch02 avro-forward-ch02backup >> >> agent1tier1.channels = channelbucket01 channelbucket02 >> agent1tier1.channels.channelbucket01.type = file >> agent1tier1.channels.channelbucket02.type = file >> >> >> agent1tier1.sources.tcpsrc.channels = channelbucket01 channelbucket02 >> agent1tier1.sources.tcpsrc.type = syslogtcp >> agent1tier1.sources.tcpsrc.port = 5149 >> agent1tier1.sources.tcpsrc.host = xx.xxx.x.104 >> agent1tier1.sources.tcpsrc.interceptors=i1 >> agent1tier1.sources.tcpsrc.interceptors.i1.type=timestamp >> >> >> >> >> #################### INTERCEPTOR ############################## >> agent1tier1.sources.tcpsrc.interceptors=logsintercept >> >> agent1tier1.sources.tcpsrc.interceptors.logsintercept.type=com.custom.flume.interceptor.eventTweaker$Builder >> agent1tier1.sources.tcpsrc.interceptors.logsintercept.hashbuckets = >> bucket01,bucket02 >> >> >> >> #################### END OF INTERCEPTOR ############################## >> >> >> >> ####################### SELECTOR ########################### >> >> agent1tier1.sources.tcpsrc.selector.type=multiplexing >> agent1tier1.sources.tcpsrc.selector.header = bucket >> agent1tier1.sources.tcpsrc.selector.mapping.bucket01 = channelbucket01 >> agent1tier1.sources.tcpsrc.selector.mapping.bucket02 = channelbucket02 >> agent1tier1.sources.tcpsrc.selector.default = channelbucket01 >> >> ##################### END OF SELECTOR ############################# >> >> >> >> #################### CHANNELS ############################## >> >> agent1tier1.channels.channelbucket01.checkpointDir = >> /home/flume/channelbucket01/file-channel/checkpoint >> agent1tier1.channels.channelbucket01.dataDirs = >> /home/flume/channelbucket01/file-channel/1/data,/home/flume/channelbucket01/file-channel/2/data,/home/flume/channelbucket01/file-channel/3/data >> >> agent1tier1.channels.channelbucket02.checkpointDir = >> /home/flume/channelbucket02/file-channel/checkpoint >> agent1tier1.channels.channelbucket02.dataDirs = >> /home/flume/channelbucket02/file-channel/1/data,/home/flume/channelbucket02/file-channel/2/data,/home/flume/channelbucket02/file-channel/3/data >> >> >> #################### CHANNELS ############################## >> >> >> >> >> >> >> ################## CHANNELS CAPACITY ############################ >> >> >> agent1tier1.channels.channelbucket01.transactionCapacity = 1000000 >> agent1tier1.channels.channelbucket01.checkpointInterval = 30000 >> agent1tier1.channels.channelbucket01.maxFileSize = 2146435071 >> agent1tier1.channels.channelbucket01.capacity = 10000000 >> >> agent1tier1.channels.channelbucket02.transactionCapacity = 1000000 >> agent1tier1.channels.channelbucket02.checkpointInterval = 30000 >> agent1tier1.channels.channelbucket02.maxFileSize = 2146435071 >> agent1tier1.channels.channelbucket02.capacity = 10000000 >> >> >> ################## END OF CHANNELS CAPACITY ############################ >> >> >> >> # avro sink properties >> agent1tier1.sinks.avro-forward-ch01.channel = channelbucket01 >> agent1tier1.sinks.avro-forward-ch01.type = avro >> agent1tier1.sinks.avro-forward-ch01.hostname = xx.xxx.x.106 >> agent1tier1.sinks.avro-forward-ch01.port = 10000 >> >> # avro sink properties >> agent1tier1.sinks.avro-forward-ch01backup.channel = channelbucket01 >> agent1tier1.sinks.avro-forward-ch01backup.type = avro >> agent1tier1.sinks.avro-forward-ch01backup.hostname = xx.xxx.x.29 >> agent1tier1.sinks.avro-forward-ch01backup.port = 19999 >> >> # avro sink properties >> agent1tier1.sinks.avro-forward-ch02.channel = channelbucket02 >> agent1tier1.sinks.avro-forward-ch02.type = avro >> agent1tier1.sinks.avro-forward-ch02.hostname = xx.xxx.x.29 >> agent1tier1.sinks.avro-forward-ch02.port = 19999 >> >> # avro sink properties >> agent1tier1.sinks.avro-forward-ch02backup.channel = channelbucket02 >> agent1tier1.sinks.avro-forward-ch02backup.type = avro >> agent1tier1.sinks.avro-forward-ch02backup.hostname = xx.xxx.x.106 >> agent1tier1.sinks.avro-forward-ch02backup.port = 10000 >> >> >> >> agent1tier1.sinkgroups = grpch1 >> agent1tier1.sinkgroups.grpch1.sinks = avro-forward-ch01 >> avro-forward-ch01backup >> agent1tier1.sinkgroups.grpch1.processor.type = failover >> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01 = 0 >> #agent1tier1.sinkgroups.grpch1.processor.priority.avro-forward-ch01backup >> = 10 >> agent1tier1.sinkgroups.grpch1.processor.maxpenalty = 10000 >> >> >> >> agent1tier1.sinkgroups = grpch2 >> agent1tier1.sinkgroups.grpch2.sinks = avro-forward-ch02 >> avro-forward-ch02backup >> agent1tier1.sinkgroups.grpch2.processor.type = failover >> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02 = 1 >> #agent1tier1.sinkgroups.grpch2.processor.priority.avro-forward-ch02backup >> = 11 >> agent1tier1.sinkgroups.grpch2.processor.maxpenalty = 10000 >> >> >> >> *Tier 2:* >> >> tier2.sources = avro-AppSrv-source >> tier2.sinks = impsink convsink clksink rtsink hdfssinkrt hdfssinkdel >> tier2.channels = channelconv channelimp channelclk channelrt >> channelhdfsrt channelhdfsdel >> tier2.channels.channelimp.type = file >> tier2.channels.channelconv.type = file >> tier2.channels.channelclk.type = file >> tier2.channels.channelrt.type = file >> tier2.channels.channelhdfsrt.type = file >> tier2.channels.channelhdfsdel.type = file >> >> # For each source, channel, and sink, set >> # standard properties. >> # properties of avro-AppSrv-source >> tier2.sources.avro-AppSrv-source.channels = channelconv channelimp >> channelclk channelrt channelhdfsrt channelhdfsdel >> tier2.sources.avro-AppSrv-source.type = avro >> tier2.sources.avro-AppSrv-source.bind = xx.xxx.x.106 >> tier2.sources.avro-AppSrv-source.port = 10000 >> >> >> >> >> >> >> tier2.sources.avro-AppSrv-source.selector.type=multiplexing >> tier2.sources.avro-AppSrv-source.selector.header = rectype >> tier2.sources.avro-AppSrv-source.selector.mapping.IMP= channelimp >> channelhdfsdel >> tier2.sources.avro-AppSrv-source.selector.mapping.CLK = channelclk >> channelhdfsdel >> tier2.sources.avro-AppSrv-source.selector.mapping.CONV = channelconv >> channelhdfsdel >> >> tier2.sources.avro-AppSrv-source.selector.mapping.RT= channelrt >> channelhdfsrt >> >> >> tier2.sources.avro-AppSrv-source.selector.default = channelhdfsdel >> >> >> >> tier2.sinks.impsink.type = file_roll >> tier2.sinks.impsink.channel = channelimp >> tier2.sinks.impsink.sink.directory = /var/log/flume/imp >> tier2.sinks.impsink.sink.rollInterval=60 >> >> tier2.sinks.convsink.type = file_roll >> tier2.sinks.convsink.channel = channelconv >> tier2.sinks.convsink.sink.directory = /var/log/flume/conv >> tier2.sinks.convsink.sink.rollInterval=60 >> >> tier2.sinks.clksink.type = file_roll >> tier2.sinks.clksink.channel = channelclk >> tier2.sinks.clksink.sink.directory = /var/log/flume/clk >> tier2.sinks.clksink.sink.rollInterval=60 >> >> >> tier2.sinks.rtsink.type = file_roll >> tier2.sinks.rtsink.channel = channelrt >> tier2.sinks.rtsink.sink.directory = /var/log/flume/rt >> tier2.sinks.rtsink.sink.rollInterval=60 >> >> >> #################### CHANNELS ############################## >> >> tier2.channels.channelimp.checkpointDir = >> /home/flume/channelimp/file-channel/checkpoint >> tier2.channels.channelimp.dataDirs = >> /home/flume/channelimp/file-channel/1/data,/home/flume/channelimp/file-channel/2/data,/home/flume/channelimp/file-channel/3/data >> >> >> tier2.channels.channelclk.checkpointDir = >> /home/flume/channelclk/file-channel/checkpoint >> tier2.channels.channelclk.dataDirs = >> /home/flume/channelclk/file-channel/1/data,/home/flume/channelclk/file-channel/2/data,/home/flume/channelclk/file-channel/3/data >> >> tier2.channels.channelconv.checkpointDir = >> /home/flume/channelconv/file-channel/checkpoint >> tier2.channels.channelconv.dataDirs = >> /home/flume/channelconv/file-channel/1/data,/home/flume/channelconv/file-channel/2/data,/home/flume/channelconv/file-channel/3/data >> >> tier2.channels.channelrt.checkpointDir = >> /home/flume/channelrt/file-channel/checkpoint >> tier2.channels.channelrt.dataDirs = >> /home/flume/channelrt/file-channel/1/data,/home/flume/channelrt/file-channel/2/data,/home/flume/channelrt/file-channel/3/data >> >> tier2.channels.channelhdfsrt.checkpointDir = >> /home/flume/channelhdfsrt/file-channel/checkpoint >> tier2.channels.channelhdfsrt.dataDirs = >> /home/flume/channelhdfsrt/file-channel/1/data,/home/flume/channelhdfsrt/file-channel/2/data,/home/flume/channelhdfsrt/file-channel/3/data >> >> tier2.channels.channelhdfsdel.checkpointDir = >> /home/flume/channelhdfsdel/file-channel/checkpoint >> tier2.channels.channelhdfsdel.dataDirs = >> /home/flume/channelhdfsdel/file-channel/1/data,/home/flume/channelhdfsdel/file-channel/2/data,/home/flume/channelhdfsdel/file-channel/3/data >> >> >> >> >> #################### CHANNELS ############################## >> >> >> tier2.sinks.hdfssinkrt.type = hdfs >> tier2.sinks.hdfssinkrt.channel = channelhdfsrt >> tier2.sinks.hdfssinkrt.hdfs.path = >> hdfs://xx.xxx.x.102/user/dataplat/adslogs/rt/year=%Y/month=%m/day=%d/hour=%H >> tier2.sinks.hdfssinkrt.hdfs.codeC = gzip >> tier2.sinks.hdfssinkrt.hdfs.fileType = CompressedStream >> tier2.sinks.hdfssinkrt.hdfs.filePrefix = FlumeSinkhdfsrt >> # Roll based on the block size only >> tier2.sinks.hdfssinkrt.hdfs.rollCount= 200000 >> tier2.sinks.hdfssinkrt.hdfs.rollInterval=120 >> tier2.sinks.hdfssinkrt.hdfs.rollSize = 0 >> # seconds to wait before closing the file. >> #tier2.sinks.hdfssinkrt.hdfs.idleTimeout = 60 >> tier2.sinks.hdfssinkrt.hdfs.batchSize=20000 >> tier2.sinks.hdfssinkrt.hdfs.txnEventMax=20000 >> #tier2.sinks.hdfssinkrt.hdfs.threadsPoolSize=20 >> tier2.sinks.hdfssinkrt.hdfs.useLocalTimeStamp = false >> tier2.sinks.hdfssinkrt.hdfs.callTimeout = 30000 >> >> >> tier2.sinks.hdfssinkdel.type = hdfs >> tier2.sinks.hdfssinkdel.channel = channelhdfsdel >> tier2.sinks.hdfssinkdel.hdfs.path = >> hdfs://xx.xxx.x.102/user/dataplat/adslogs/del/year=%Y/month=%m/day=%d/hour=%H >> tier2.sinks.hdfssinkdel.hdfs.codeC = gzip >> tier2.sinks.hdfssinkdel.hdfs.fileType = CompressedStream >> tier2.sinks.hdfssinkdel.hdfs.filePrefix = FlumeSinkhdfsdel >> # Roll based on the block size only >> tier2.sinks.hdfssinkdel.hdfs.rollCount= 200000 >> tier2.sinks.hdfssinkdel.hdfs.rollInterval=120 >> tier2.sinks.hdfssinkdel.hdfs.rollSize = 0 >> # seconds to wait before closing the file. >> #tier2.sinks.hdfssinkdel.hdfs.idleTimeout = 60 >> tier2.sinks.hdfssinkdel.hdfs.batchSize=20000 >> tier2.sinks.hdfssinkdel.hdfs.txnEventMax=20000 >> #tier2.sinks.hdfssinkdel.hdfs.threadsPoolSize=20 >> tier2.sinks.hdfssinkdel.hdfs.useLocalTimeStamp = false >> tier2.sinks.hdfssinkdel.hdfs.callTimeout = 30000 >> #################### END OF SINKS ############################## >> >> >> >> >> >> >> >> >> >> >
