[ https://issues.apache.org/jira/browse/FLUME-3032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
刘天昊 updated FLUME-3032: ----------------------- Attachment: FLUME-3032.patch > taildir source sleeps frequently. > --------------------------------- > > Key: FLUME-3032 > URL: https://issues.apache.org/jira/browse/FLUME-3032 > Project: Flume > Issue Type: Bug > Components: Sinks+Sources > Affects Versions: v1.7.0 > Environment: CentOS Linux release 7.2.1511 (Core) > java version "1.7.0_80" > Reporter: 刘天昊 > Labels: newbie > Attachments: FLUME-3032.patch > > Original Estimate: 1h > Remaining Estimate: 1h > > Test configuration. > source - taildir > interceptor - The custom interceptor drops some events > channel - anyone > sink - none > I found that taildir source sleeps frequently. > The tailFileProcess() function in TaildirSource.java break the loop by > (events.size() < batchSize), but interceptor may change events.size(). > I think the events.size() should be used before interceptor processing. > Avoid unnecessary sleep. > {code:title=TaildirSource.java|borderStyle=solid} > private void tailFileProcess(TailFile tf, boolean backoffWithoutNL) > throws IOException, InterruptedException { > long receivedSize = 0; > while (true) { > reader.setCurrentFile(tf); > List<Event> events = reader.readEvents(batchSize, > backoffWithoutNL); > if (events.isEmpty()) { > break; > } > receivedSize = events.size(); > sourceCounter.addToEventReceivedCount(receivedSize); > sourceCounter.incrementAppendBatchReceivedCount(); > try { > getChannelProcessor().processEventBatch(events); > reader.commit(); > } catch (ChannelException ex) { > logger.warn("The channel is full or unexpected failure. " > + "The source will try again after " + retryInterval > + " ms"); > TimeUnit.MILLISECONDS.sleep(retryInterval); > retryInterval = retryInterval << 1; > retryInterval = Math.min(retryInterval, maxRetryInterval); > continue; > } > retryInterval = 1000; > sourceCounter.addToEventAcceptedCount(events.size()); > sourceCounter.incrementAppendBatchAcceptedCount(); > if (receivedSize < batchSize) { > break; > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)