wang qun created FLUME-3364:
-------------------------------
Summary: TailDir BackOff Method always blocks the thread
Key: FLUME-3364
URL: https://issues.apache.org/jira/browse/FLUME-3364
Project: Flume
Issue Type: Bug
Components: Sinks+Sources
Affects Versions: 1.9.0
Environment: in mac system. The backoff method will block the thread
with the data has been read.
Reporter: wang qun
Attachments: image-2020-04-26-21-43-52-084.png,
image-2020-04-26-21-44-01-785.png
The backoff method in taildir source is invalid because of a code bug.
This part should not set default status as false
{code:java}
//代码占位符
public Status process() {
Status status = Status.BACKOFF;
try {
existingInodes.clear();
existingInodes.addAll(reader.updateTailFiles());
for (long inode : existingInodes) {
TailFile tf = reader.getTailFiles().get(inode);
if (tf.needTail()) {
boolean hasMoreLines = tailFileProcess(tf, true);
if (hasMoreLines) {
status = Status.READY;
}
}
}
closeTailFiles();
} catch (Throwable t) {
logger.error("Unable to tail files", t);
sourceCounter.incrementEventReadFail();
status = Status.BACKOFF;
}
return status;
}
{code}
{code:java}
//代码占位符
private boolean tailFileProcess(TailFile tf, boolean backoffWithoutNL)
throws IOException, InterruptedException {
long batchCount = 0;
while (true) {
reader.setCurrentFile(tf);
List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
if (events.isEmpty()) {
return false;
}
sourceCounter.addToEventReceivedCount(events.size());
sourceCounter.incrementAppendBatchReceivedCount();
try {
getChannelProcessor().processEventBatch(events);
reader.commit();
} catch (ChannelException ex) {
logger.warn("The channel is full or unexpected failure. " +
"The source will try again after " + retryInterval + " ms");
sourceCounter.incrementChannelWriteFail();
TimeUnit.MILLISECONDS.sleep(retryInterval);
retryInterval = retryInterval << 1;
retryInterval = Math.min(retryInterval, maxRetryInterval);
continue;
}
retryInterval = 1000;
sourceCounter.addToEventAcceptedCount(events.size());
sourceCounter.incrementAppendBatchAcceptedCount();
if (events.size() < batchSize) {
logger.debug("The events taken from " + tf.getPath() + " is less than " +
batchSize);
return false;
}
if (++batchCount >= maxBatchCount) {
logger.debug("The batches read from the same file is larger than " +
maxBatchCount );
return true;
}
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]