[ https://issues.apache.org/jira/browse/FLUME-3032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15758779#comment-15758779 ]
Liu Tianhao edited comment on FLUME-3032 at 12/19/16 9:21 AM: -------------------------------------------------------------- Sorry, I do not know how to design a junit test to cover this change. This is just to avoid unnecessary sleep, could you give me a little help? Thank you very much!!! was (Author: liutianhao): Sorry, I do not know how to design a junit test to cover this change. This is just to avoid unnecessary sleep, can you give me a little help? Thank you very much!!! > taildir source sleeps frequently. > --------------------------------- > > Key: FLUME-3032 > URL: https://issues.apache.org/jira/browse/FLUME-3032 > Project: Flume > Issue Type: Bug > Components: Sinks+Sources > Affects Versions: v1.7.0 > Environment: CentOS Linux release 7.2.1511 (Core) > java version "1.7.0_80" > Reporter: Liu Tianhao > Labels: newbie > Attachments: FLUME-3032.patch > > Original Estimate: 1h > Remaining Estimate: 1h > > Test configuration. > source - taildir > interceptor - The custom interceptor drops some events > channel - anyone > sink - none > I found that taildir source sleeps frequently. > The tailFileProcess() function in TaildirSource.java break the loop by > (events.size() < batchSize), but interceptor may change events.size(). > I think the events.size() should be used before interceptor processing. > Avoid unnecessary sleep. > {code:title=TaildirSource.java|borderStyle=solid} > private void tailFileProcess(TailFile tf, boolean backoffWithoutNL) > throws IOException, InterruptedException { > long receivedSize = 0; > while (true) { > reader.setCurrentFile(tf); > List<Event> events = reader.readEvents(batchSize, > backoffWithoutNL); > if (events.isEmpty()) { > break; > } > receivedSize = events.size(); > sourceCounter.addToEventReceivedCount(receivedSize); > sourceCounter.incrementAppendBatchReceivedCount(); > try { > getChannelProcessor().processEventBatch(events); > reader.commit(); > } catch (ChannelException ex) { > logger.warn("The channel is full or unexpected failure. " > + "The source will try again after " + retryInterval > + " ms"); > TimeUnit.MILLISECONDS.sleep(retryInterval); > retryInterval = retryInterval << 1; > retryInterval = Math.min(retryInterval, maxRetryInterval); > continue; > } > retryInterval = 1000; > sourceCounter.addToEventAcceptedCount(events.size()); > sourceCounter.incrementAppendBatchAcceptedCount(); > if (receivedSize < batchSize) { > break; > } > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)