We have a flume agent that uses spooldir source and Elasticsearch sink.
Sometimes the flume stop processing any newly added files to that
directory.
When I checked the logs of flume I found this exception
*2016-05-17 00:04:25,859 ERROR
org.apache.flume.source.SpoolDirectorySource: FATAL: Spool Directory source
s1: { spoolDir: /DataCollection/Nflume_elstic/ }: Uncaught exception in
SpoolDirectorySource thread. Restart or reconfigure Flume to continue
processing.*
*java.io.IOException: Unable to delete spool file:
/DataCollection/Nflume_elstic/1463386734204-1489*
* at
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.deleteCurrentFile(ReliableSpoolingFileEventReader.java:396)*
* at
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile(ReliableSpoolingFileEventReader.java:316)*
* at
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:243)*
* at
org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:227)*
* at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)*
* at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)*
* at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)*
* at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)*
* at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)*
* at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)*
* at java.lang.Thread.run(Thread.java:745)*
*2016-05-17 00:10:09,599 ERROR org.css.cssElasticsearchSink: Failed to
commit transaction. Transaction rolled back.*
*org.elasticsearch.action.ActionRequestValidationException: Validation
Failed: 1: no requests added;*
* at
org.elasticsearch.action.ValidateActions.addValidationError(ValidateActions.java:29)*
* at
org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:459)*
* at
org.elasticsearch.action.TransportActionNodeProxy.execute(TransportActionNodeProxy.java:52)*
* at
org.elasticsearch.client.transport.support.InternalTransportClient$1.doWithNode(InternalTransportClient.java:109)*
* at
org.elasticsearch.client.transport.TransportClientNodesService.execute(TransportClientNodesService.java:205)*
* at
org.elasticsearch.client.transport.support.InternalTransportClient.execute(InternalTransportClient.java:106)*
* at
org.elasticsearch.client.support.AbstractClient.bulk(AbstractClient.java:163)*
* at
org.elasticsearch.client.transport.TransportClient.bulk(TransportClient.java:356)*
* at
org.elasticsearch.action.bulk.BulkRequestBuilder.doExecute(BulkRequestBuilder.java:164)*
* at
org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:91)*
* at
org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:65)*
* at
org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.execute(ElasticSearchTransportClient.java:181)*
* at org.css.cssElasticsearchSink.process(cssElasticsearchSink.java:143)*
* at
org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)*
* at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)*
* at java.lang.Thread.run(Thread.java:745)*
*2016-05-17 00:10:09,602 WARN org.apache.flume.sink.FailoverSinkProcessor:
Sink es_sink2 failed and has been sent to failover list*
*org.elasticsearch.action.ActionRequestValidationException: Validation
Failed: 1: no requests added;*
* at
org.elasticsearch.action.ValidateActions.addValidationError(ValidateActions.java:29)*
* at
org.elasticsearch.action.bulk.BulkRequest.validate(BulkRequest.java:459)*
* at
org.elasticsearch.action.TransportActionNodeProxy.execute(TransportActionNodeProxy.java:52)*
* at
org.elasticsearch.client.transport.support.InternalTransportClient$1.doWithNode(InternalTransportClient.java:109)*
* at
org.elasticsearch.client.transport.TransportClientNodesService.execute(TransportClientNodesService.java:205)*
* at
org.elasticsearch.client.transport.support.InternalTransportClient.execute(InternalTransportClient.java:106)*
* at
org.elasticsearch.client.support.AbstractClient.bulk(AbstractClient.java:163)*
* at
org.elasticsearch.client.transport.TransportClient.bulk(TransportClient.java:356)*
* at
org.elasticsearch.action.bulk.BulkRequestBuilder.doExecute(BulkRequestBuilder.java:164)*
* at
org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:91)*
* at
org.elasticsearch.action.ActionRequestBuilder.execute(ActionRequestBuilder.java:65)*
* at
org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.execute(ElasticSearchTransportClient.java:181)*
* at org.css.cssElasticsearchSink.process(cssElasticsearchSink.java:143)*
* at
org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:182)*
* at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)*
* at java.lang.Thread.run(Thread.java:745)*
The last part of the exception continue repeating until we restart the
agent.
This issue is happening frequently.
Do you have any idea about this problem?