Greetings all, I currently have flume deployed in a configuration where multiple servers have local flume agents sending logs to avro sources on central flume servers. These servers are sending the data to elasticsearch using the elasticsearch sink. It generally works very well, but when the elasticsearch servers can't keep up, I would rather lose logs by having them sent to the null sink than to have any services logging to this flume agent have issues because they can't post messages to the elasticsearch sink.
In cases when the elasticsearch sinks are lagging, I can see logged exceptions warning me that the sink likely can't keep up. The tricky bit is that the connection to the elasticsearch sinks doesn't go down, just that the channel on the central flume server remains full and attempts to drain it keep failing. I have confirmed failover to the null sink if I the elasticsearch servers are down, but not when they can't keep up. The current config of the central flume servers is provided below. Advice/insight would be much appreciated. Thank you, Ali Asad Lotia agent1.sources = avroSource httpSource agent1.channels = memoryChannel agent1.sinks = nullSink elasticSink # For each one of the sources, the type is defined # Avro source definition agent1.sources.avroSource.type = avro agent1.sources.avroSource.bind = 0.0.0.0 agent1.sources.avroSource.port = 16310 agent1.sources.avroSource.threads = 8 agent1.sources.avroSource.channels = memoryChannel # HTTP source definition defaults to the JSONHandler agent1.sources.httpSource.type = http agent1.sources.httpSource.bind = 0.0.0.0 agent1.sources.httpSource.port = 16311 agent1.sources.httpSource.channels = memoryChannel agent1.sinkgroups = g1 agent1.sinkgroups.g1.sinks = nullSink agent1.sinkgroups.g1.processor.type = failover agent1.sinkgroups.g1.processor.priority.nullSink = 1 agent1.sinkgroups.g1.processor.priority.elasticSink = 10 agent1.sinks.nullSink.type = null agent1.sinks.nullSink.channel = memoryChannel # Each sink's type must be defined agent1.sinks.elasticSink.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink agent1.sinks.elasticSink.hostNames = <my-elasticsearch-host>:16200 agent1.sinks.elasticSink.indexName = flume agent1.sinks.elasticSink.indexType = logs agent1.sinks.elasticSink.clusterName = <myorg>-us-east-1 agent1.sinks.elasticSink.batchSize = 100 agent1.sinks.elasticSink.ttl = 30 agent1.sinks.elasticSink.channel = memoryChannel agent1.sinks.elasticSink.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer # Each channel's type is defined. agent1.channels.memoryChannel.type = memory # Other config values specific to each type of channel(sink or source) # can be defined as well # In this case, it specifies the capacity of the memory channel agent1.channels.memoryChannel.capacity = 100000
