> On 二月 28, 2017, 2:24 p.m., Balázs Donát Bessenyei wrote:
> > flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java,
> >  line 196
> > <https://reviews.apache.org/r/50438/diff/2/?file=1638339#file1638339line196>
> >
> >     Can the pattern be cached? (Eg. in configure)

OK, I will fix it.


> On 二月 28, 2017, 2:24 p.m., Balázs Donát Bessenyei wrote:
> > flume-ng-doc/sphinx/FlumeUserGuide.rst, line 1191
> > <https://reviews.apache.org/r/50438/diff/2/?file=1638337#file1638337line1191>
> >
> >     Can you please clarify this setting?

It's explained in the function readMultilineEventPre() and 
readMultilineEventNext(). And you can see the effect in TestTaildirSource.

Previous:   If not matched, this line is not part of previous event when the 
buffer event is not null.
               Then create a new event with buffer event's message and put the 
current line into the
               cleared buffer event.
Next:        If not matched, this line is not part of next event. Then merge 
the current line into the
               buffer event and create a new event with the merged message.

private Event readMultilineEventPre(LineResult line, boolean match, Pattern 
pattern)
          throws IOException {
    Event event = null;
    Matcher m = pattern.matcher(new String(line.line));
    boolean find = m.find();
    match = (find && match) || (!find && !match);
    byte[] lineBytes = toOriginBytes(line);
    if (match) {
      /** If matched, merge it to the buffer event. */
      mergeEvent(line);
    } else {
      /**
       * If not matched, this line is not part of previous event when the 
buffer event is not null.
       * Then create a new event with buffer event's message and put the 
current line into the
       * cleared buffer event.
       */
      if (bufferEvent != null) {
        event = EventBuilder.withBody(bufferEvent.getBody());
      }
      bufferEvent = null;
      bufferEvent = EventBuilder.withBody(lineBytes);
      if (line.lineSepInclude) {
        bufferEvent.getHeaders().put("lineCount", "1");
      }
      long now = System.currentTimeMillis();
      bufferEvent.getHeaders().put(TimestampInterceptor.Constants.TIMESTAMP, 
Long.toString(now));
    }
    return event;
  }

  private Event readMultilineEventNext(LineResult line, boolean match, Pattern 
pattern)
          throws IOException {
    Event event = null;
    Matcher m = pattern.matcher(new String(line.line));
    boolean find = m.find();
    match = (find && match) || (!find && !match);
    if (match) {
      /** If matched, merge it to the buffer event. */
      mergeEvent(line);
    } else {
      /**
       * If not matched, this line is not part of next event. Then merge the 
current line into the
       * buffer event and create a new event with the merged message.
       */
      mergeEvent(line);
      event = EventBuilder.withBody(bufferEvent.getBody());
      bufferEvent = null;
    }
    return event;
  }


> On 二月 28, 2017, 2:24 p.m., Balázs Donát Bessenyei wrote:
> > flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java,
> >  line 115
> > <https://reviews.apache.org/r/50438/diff/2/?file=1638339#file1638339line115>
> >
> >     Isn't maxBytes and maxLines missing here?

No, maxBytes and maxLines logic is after readline() in readEvents(). Precisely 
the function should be called needFlushTimeoutEvent(). I will fix it ASAP.

public List<Event> readEvents(int numEvents, boolean backoffWithoutNL,
      boolean addByteOffset) throws IOException {
    List<Event> events = Lists.newLinkedList();
    if (this.multiline) {
      boolean match = this.multilinePatternMatched;
      Pattern pattern = Pattern.compile(this.multilinePattern);
      while (events.size() < numEvents) {
        LineResult line = readLine();
        if (line == null) {
          break;
        }
        Event event = null;
        switch (this.multilinePatternBelong) {
          case "next":
            event = readMultilineEventNext(line, match, pattern);
            break;
          case "previous":
            event = readMultilineEventPre(line, match, pattern);
            break;
          default:
            break;
        }
        if (event != null) {
          events.add(event);
        }
        if (bufferEvent != null) {
          if (bufferEvent.getBody().length >= multilineMaxBytes
              || Integer.parseInt(bufferEvent.getHeaders().get("lineCount")) == 
multilineMaxLines) {
            flushBufferEvent(events);
          }
        }
      }
      if (isNeedFlushBufferEvent()) {
        flushBufferEvent(events);
      }
    } else {
    ...
    }


- qiao


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/50438/#review167074
-----------------------------------------------------------


On 二月 17, 2017, 11:13 a.m., qiao wen wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/50438/
> -----------------------------------------------------------
> 
> (Updated 二月 17, 2017, 11:13 a.m.)
> 
> 
> Review request for Flume.
> 
> 
> Repository: flume-git
> 
> 
> Description
> -------
> 
> TaidirSource defaults to LINE, this has issue when multiline log events like 
> stack traces and have request/responses. Following part is Java traceback 
> logs. We expect to have log line start regex Key to aggregate all the log 
> lines till the next regex key is found.
> 2016-07-16 14:59:43,956 ERROR lifecycleSupervisor-1-7 LifecycleSupervisor.run 
> - Unable to start EventDrivenSourceRunner: { 
> source:cn.yottabyte.flume.source.http.HTTPSource{name:sourceHttp,state:IDLE} 
> } - Exception follows.
> java.lang.IllegalStateException: Running HTTP Server found in source: 
> sourceHttp before I started one. Will not attempt to start.
>     at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
>     at 
> cn.yottabyte.flume.source.http.HTTPSource.startHttpSourceServer(HTTPSource.java:170)
>     at cn.yottabyte.flume.source.http.HTTPSource.start(HTTPSource.java:166)
>     at 
> org.apache.flume.source.EventDrivenSourceRunner.start(EventDrivenSourceRunner.java:44)
>     at 
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
> 
> 
> Diffs
> -----
> 
>   flume-ng-doc/sphinx/FlumeUserGuide.rst afa6625 
>   
> flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
>  8838320 
>   
> flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java
>  42474c4 
>   
> flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
>  a107a01 
>   
> flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
>  f2347f3 
>   
> flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
>  097ee0b 
> 
> Diff: https://reviews.apache.org/r/50438/diff/
> 
> 
> Testing
> -------
> 
> All tests in TestTaildirSource passed.
> 
> 
> Thanks,
> 
> qiao wen
> 
>

Reply via email to