Kushal, Have you considered removing the sinks from the sinkGroup? This will increase your concurrency for processing channel events by allowing both sinks to read from the channel simultaneously. With a sink group in place only one sink will read at a time.
Hope this helps. -Jeff On Fri, May 2, 2014 at 2:31 AM, Mangtani, Kushal <[email protected] > wrote: > > > Hi, > > > > I'm using Flume-Ng 1.4 cdh4.4 Tarball for collecting aggregated logs. > > I am running a 2 tier(agent,collector) Flume Configuration with custom > plugins. There are approximately 20 agents (receiving data) and 6 collector > flume (writing to HDFS) machines all running independenly. However, The > channel in the agent is not able to keep up with inputs events causing the > channel to get full and drop events. > > > > Key Points: > > > > 1. Input rate is 2000 events/sec ;Avg size of each event is 2KB.. > At peak, we have 4 MB/sec of input traffic > > 2. After some debugging, we inferred that sink was not draining > events fast enough; so > > a. We tried change the Sink from Avro to Thrift > > b. Also, we decided to increase parallelism in channels,sinks of > agent process; so we used ChannelMultiplexing and distributed the traffic > across multiple channels instead of one. > > However, 2 a) Or 2b) from above did not help. > > > > 3. I have set XMS, Xmx to 1GB, 8 GB respectively > > > > > > Agent Conf: > > > > # Name the components on this agent > > agent.sources = r1 > > agent.channels = c1 > > agent.sinks = k1 k2 > > > > # Describe/configure the source > > agent.sources.r1.type = CustomSource-1 > > agent.sources.r1.port = 4000 > > agent.sources.r1.containsVersion = true > > agent.sources.r1.channels = c1 > > agent.sources.r1.interceptors = i1 i2 > > agent.sources.r1.interceptors.i1.type = CustomInterceptor-1 > > agent.sources.r1.interceptors.i1.schemaFolder = /usr/lib/flume-ng/schema > > agent.sources.r1.interceptors.i1.discardEventsAfterDays = 7 > > agent.sources.r1.interceptors.i2.type = CustomInterceptor-2 > > agent.sources.r1.interceptors.i2.schemaFolder = /usr/lib/flume-ng/schema > > agent.sources.r1.interceptors.i2.optoutCron = 0 * * * * > > > > # Use a channel which buffers events in memory > > agent.channels.c1.type = memory > > agent.channels.c1.capacity = 1000000 > > agent.channels.c1.transactionCapacity = 10000 > > > > > > #Load balancing sink group > > agent.sinkgroups = g1 > > agent.sinkgroups.g1.sinks = k1 k2 > > agent.sinkgroups.g1.processor.type = load_balance > > agent.sinkgroups.g1.processor.backoff = true > > agent.sinkgroups.g1.processor.selector = random > > agent.sinkgroups.g1.processor.selector.maxTimeOut = 64000 > > > > # Describe the sink k1 > > agent.sinks.k1.type = avro > > agent.sinks.k1.channel = c1 > > agent.sinks.k1.hostname = machine-1 > > agent.sinks.k1.port = 5300 > > agent.sinks.k1.batch-size = 10000 > > > > # Describe the sink k2 > > agent.sinks.k2.type = avro > > agent.sinks.k2.channel = c1 > > agent.sinks.k2.hostname = machine-2 > > agent.sinks.k2.port = 5300 > > agent.sinks.k2.batch-size = 10000 > > > > FYI: I have tried a lot of tweaking across channel.transaction capacity > and sink.batch size ; eventually we came up with value of 10,000 for both > the conf properties. > > Questions: > > 1. Could you tell me how can I increase the downstream rate of > channel such that the Channel never gets full? Ideally, we want a scenario > that the Sink is draining events from the Channel at the same rate to which > they are getting Put in the channel? > > > > Your inputs/suggestions will be thoroughly appreciated. > > > > > > Regards > > Kushal Mangtani > > Software Engineer > > > > >
