wang qun created FLUME-3364:
-------------------------------

             Summary: TailDir BackOff Method always blocks the thread
                 Key: FLUME-3364
                 URL: https://issues.apache.org/jira/browse/FLUME-3364
             Project: Flume
          Issue Type: Bug
          Components: Sinks+Sources
    Affects Versions: 1.9.0
         Environment: in mac system.  The backoff method will block the thread 
with the data has been read. 
            Reporter: wang qun
         Attachments: image-2020-04-26-21-43-52-084.png, 
image-2020-04-26-21-44-01-785.png

The backoff method in taildir source is invalid because of a code bug.

This part should not set default status as false
{code:java}
//代码占位符
public Status process() {
  Status status = Status.BACKOFF;
  try {
    existingInodes.clear();
    existingInodes.addAll(reader.updateTailFiles());
    for (long inode : existingInodes) {
      TailFile tf = reader.getTailFiles().get(inode);
      if (tf.needTail()) {
        boolean hasMoreLines = tailFileProcess(tf, true);
        if (hasMoreLines) {
          status = Status.READY;
        }
      }
    }
    closeTailFiles();
  } catch (Throwable t) {
    logger.error("Unable to tail files", t);
    sourceCounter.incrementEventReadFail();
    status = Status.BACKOFF;
  }
  return status;
}
{code}
 
{code:java}
//代码占位符
private boolean tailFileProcess(TailFile tf, boolean backoffWithoutNL)
    throws IOException, InterruptedException {
  long batchCount = 0;
  while (true) {
    reader.setCurrentFile(tf);
    List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
    if (events.isEmpty()) {
      return false;
    }
    sourceCounter.addToEventReceivedCount(events.size());
    sourceCounter.incrementAppendBatchReceivedCount();
    try {
      getChannelProcessor().processEventBatch(events);
      reader.commit();
    } catch (ChannelException ex) {
      logger.warn("The channel is full or unexpected failure. " +
          "The source will try again after " + retryInterval + " ms");
      sourceCounter.incrementChannelWriteFail();
      TimeUnit.MILLISECONDS.sleep(retryInterval);
      retryInterval = retryInterval << 1;
      retryInterval = Math.min(retryInterval, maxRetryInterval);
      continue;
    }
    retryInterval = 1000;
    sourceCounter.addToEventAcceptedCount(events.size());
    sourceCounter.incrementAppendBatchAcceptedCount();
    if (events.size() < batchSize) {
      logger.debug("The events taken from " + tf.getPath() + " is less than " + 
batchSize);
      return false;
    }
    if (++batchCount >= maxBatchCount) {
      logger.debug("The batches read from the same file is larger than " + 
maxBatchCount );
      return true;
    }
  }
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to