Jun Seok Hong created FLUME-2811: ------------------------------------ Summary: Taildir source doesn't call stop() on graceful shutdown Key: FLUME-2811 URL: https://issues.apache.org/jira/browse/FLUME-2811 Project: Flume Issue Type: Bug Components: Sinks+Sources Affects Versions: v1.7.0 Reporter: Jun Seok Hong Priority: Critical
Taildir source doesn't call stop() on graceful shutdown. Test configuration. source - taildir channel - PseudoTxnMemoryChannel / flume-kafka-channel sink - none I found that flume sometimes doesn't terminate with Taildir source. I had to kill the process to terminate it. tailFileProcess() function in TaildirSource.java has a infinite loop. When the process interrupted, ChannelException will happen, but it can't breaks the infinite loop. I think that's the reason why Taildir can't call stop() function. {code:title=TaildirSource.java|borderStyle=solid} private void tailFileProcess(TailFile tf, boolean backoffWithoutNL) throws IOException, InterruptedException { while (true) { reader.setCurrentFile(tf); List<Event> events = reader.readEvents(batchSize, backoffWithoutNL); if (events.isEmpty()) { break; } sourceCounter.addToEventReceivedCount(events.size()); sourceCounter.incrementAppendBatchReceivedCount(); try { getChannelProcessor().processEventBatch(events); reader.commit(); } catch (ChannelException ex) { logger.warn("The channel is full or unexpected failure. " + "The source will try again after " + retryInterval + " ms"); TimeUnit.MILLISECONDS.sleep(retryInterval); retryInterval = retryInterval << 1; retryInterval = Math.min(retryInterval, maxRetryInterval); continue; } retryInterval = 1000; sourceCounter.addToEventAcceptedCount(events.size()); sourceCounter.incrementAppendBatchAcceptedCount(); if (events.size() < batchSize) { break; } } } {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)