[ 
https://issues.apache.org/jira/browse/FLUME-3032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15767508#comment-15767508
 ] 

Attila Simon commented on FLUME-3032:
-------------------------------------

I would refactor {{tailFileProcess()}} to a separate class which would get its 
dependencies via constructor and define this function as part of its public 
API. That class could be tested as a unit by mocking its constructor params and 
see whether all the events was passed to the channel just by invoking the 
{{tailFileProcess()}} once. Mock which will be needed: 
{{getChannelProcessor().processEventBatch()}} should also modify the number of 
{{events}} beside count how many arrived (to imitate an interceptor which 
dropped one and also to verify that all events arrived). I think mocking the 
{{reader}} to produce {{events}} simply would be a convenient choice over 
setting up temporal files. I know this would be a relatively bigger change 
compared to your existing patch but would make an improvement on code quality. 
If you think you need help with it I'm happy to answer your questions or 
provide more details. 

Also another option would be to test this functionality via the public 
{{process()}} function by mocking all the relevant objects as written above 
plus the ones required by {{process()}} itself to work. 

There might be other options I guess every idea is welcomed.


Just double checked and you missed a space before '=' here: {{long 
receivedSize= 0;}}
and missed a semicolon here: {{receivedSize = events.size()}}

> taildir source sleeps frequently.
> ---------------------------------
>
>                 Key: FLUME-3032
>                 URL: https://issues.apache.org/jira/browse/FLUME-3032
>             Project: Flume
>          Issue Type: Bug
>          Components: Sinks+Sources
>    Affects Versions: v1.7.0
>         Environment: CentOS Linux release 7.2.1511 (Core) 
> java version "1.7.0_80"
>            Reporter: Liu Tianhao
>              Labels: newbie
>         Attachments: FLUME-3032.patch
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Test configuration.
> source - taildir
> interceptor -  The custom interceptor drops some events
> channel - anyone
> sink - none
> I found that taildir source sleeps frequently.
> The tailFileProcess() function in TaildirSource.java break the loop by 
> (events.size() < batchSize), but interceptor may change events.size().
> I think the events.size() should be used before interceptor processing. 
> Avoid unnecessary sleep.
> {code:title=TaildirSource.java|borderStyle=solid}
>     private void tailFileProcess(TailFile tf, boolean backoffWithoutNL)
>             throws IOException, InterruptedException {
>         long receivedSize = 0;
>         while (true) {
>             reader.setCurrentFile(tf);
>             List<Event> events = reader.readEvents(batchSize, 
> backoffWithoutNL);
>             if (events.isEmpty()) {
>                 break;
>             }
>             receivedSize = events.size();
>             sourceCounter.addToEventReceivedCount(receivedSize);
>             sourceCounter.incrementAppendBatchReceivedCount();
>             try {
>                 getChannelProcessor().processEventBatch(events);
>                 reader.commit();
>             } catch (ChannelException ex) {
>                 logger.warn("The channel is full or unexpected failure. "
>                         + "The source will try again after " + retryInterval
>                         + " ms");
>                 TimeUnit.MILLISECONDS.sleep(retryInterval);
>                 retryInterval = retryInterval << 1;
>                 retryInterval = Math.min(retryInterval, maxRetryInterval);
>                 continue;
>             }
>             retryInterval = 1000;
>             sourceCounter.addToEventAcceptedCount(events.size());
>             sourceCounter.incrementAppendBatchAcceptedCount();
>             if (receivedSize < batchSize) {
>                 break;
>             }
>         }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to