[ https://issues.apache.org/jira/browse/FLUME-3032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15767508#comment-15767508 ]
Attila Simon commented on FLUME-3032: ------------------------------------- I would refactor {{tailFileProcess()}} to a separate class which would get its dependencies via constructor and define this function as part of its public API. That class could be tested as a unit by mocking its constructor params and see whether all the events was passed to the channel just by invoking the {{tailFileProcess()}} once. Mock which will be needed: {{getChannelProcessor().processEventBatch()}} should also modify the number of {{events}} beside count how many arrived (to imitate an interceptor which dropped one and also to verify that all events arrived). I think mocking the {{reader}} to produce {{events}} simply would be a convenient choice over setting up temporal files. I know this would be a relatively bigger change compared to your existing patch but would make an improvement on code quality. If you think you need help with it I'm happy to answer your questions or provide more details. Also another option would be to test this functionality via the public {{process()}} function by mocking all the relevant objects as written above plus the ones required by {{process()}} itself to work. There might be other options I guess every idea is welcomed. Just double checked and you missed a space before '=' here: {{long receivedSize= 0;}} and missed a semicolon here: {{receivedSize = events.size()}} > taildir source sleeps frequently. > --------------------------------- > > Key: FLUME-3032 > URL: https://issues.apache.org/jira/browse/FLUME-3032 > Project: Flume > Issue Type: Bug > Components: Sinks+Sources > Affects Versions: v1.7.0 > Environment: CentOS Linux release 7.2.1511 (Core) > java version "1.7.0_80" > Reporter: Liu Tianhao > Labels: newbie > Attachments: FLUME-3032.patch > > Original Estimate: 1h > Remaining Estimate: 1h > > Test configuration. > source - taildir > interceptor - The custom interceptor drops some events > channel - anyone > sink - none > I found that taildir source sleeps frequently. > The tailFileProcess() function in TaildirSource.java break the loop by > (events.size() < batchSize), but interceptor may change events.size(). > I think the events.size() should be used before interceptor processing. > Avoid unnecessary sleep. > {code:title=TaildirSource.java|borderStyle=solid} > private void tailFileProcess(TailFile tf, boolean backoffWithoutNL) > throws IOException, InterruptedException { > long receivedSize = 0; > while (true) { > reader.setCurrentFile(tf); > List<Event> events = reader.readEvents(batchSize, > backoffWithoutNL); > if (events.isEmpty()) { > break; > } > receivedSize = events.size(); > sourceCounter.addToEventReceivedCount(receivedSize); > sourceCounter.incrementAppendBatchReceivedCount(); > try { > getChannelProcessor().processEventBatch(events); > reader.commit(); > } catch (ChannelException ex) { > logger.warn("The channel is full or unexpected failure. " > + "The source will try again after " + retryInterval > + " ms"); > TimeUnit.MILLISECONDS.sleep(retryInterval); > retryInterval = retryInterval << 1; > retryInterval = Math.min(retryInterval, maxRetryInterval); > continue; > } > retryInterval = 1000; > sourceCounter.addToEventAcceptedCount(events.size()); > sourceCounter.incrementAppendBatchAcceptedCount(); > if (receivedSize < batchSize) { > break; > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)