[ https://issues.apache.org/jira/browse/FLUME-2811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Umesh Chaudhary reassigned FLUME-2811: -------------------------------------- Assignee: Umesh Chaudhary > Taildir source doesn't call stop() on graceful shutdown > ------------------------------------------------------- > > Key: FLUME-2811 > URL: https://issues.apache.org/jira/browse/FLUME-2811 > Project: Flume > Issue Type: Bug > Components: Sinks+Sources > Affects Versions: v1.7.0 > Reporter: Jun Seok Hong > Assignee: Umesh Chaudhary > Priority: Critical > Labels: newbie > Fix For: v1.7.0 > > > Taildir source doesn't call stop() on graceful shutdown. > Test configuration. > source - taildir > channel - PseudoTxnMemoryChannel / flume-kafka-channel > sink - none > I found that flume sometimes doesn't terminate with Taildir source. > I had to kill the process to terminate it. > tailFileProcess() function in TaildirSource.java has a infinite loop. > When the process interrupted, ChannelException will happen, but it can't > breaks the infinite loop. > I think that's the reason why Taildir can't call stop() function. > {code:title=TaildirSource.java|borderStyle=solid} > private void tailFileProcess(TailFile tf, boolean backoffWithoutNL) > throws IOException, InterruptedException { > while (true) { > reader.setCurrentFile(tf); > List<Event> events = reader.readEvents(batchSize, backoffWithoutNL); > if (events.isEmpty()) { > break; > } > sourceCounter.addToEventReceivedCount(events.size()); > sourceCounter.incrementAppendBatchReceivedCount(); > try { > getChannelProcessor().processEventBatch(events); > reader.commit(); > } catch (ChannelException ex) { > logger.warn("The channel is full or unexpected failure. " + > "The source will try again after " + retryInterval + " ms"); > TimeUnit.MILLISECONDS.sleep(retryInterval); > retryInterval = retryInterval << 1; > retryInterval = Math.min(retryInterval, maxRetryInterval); > continue; > } > retryInterval = 1000; > sourceCounter.addToEventAcceptedCount(events.size()); > sourceCounter.incrementAppendBatchAcceptedCount(); > if (events.size() < batchSize) { > break; > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)