> On 二月 28, 2017, 2:24 p.m., Balázs Donát Bessenyei wrote: > > flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java, > > line 196 > > <https://reviews.apache.org/r/50438/diff/2/?file=1638339#file1638339line196> > > > > Can the pattern be cached? (Eg. in configure)
OK, I will fix it. > On 二月 28, 2017, 2:24 p.m., Balázs Donát Bessenyei wrote: > > flume-ng-doc/sphinx/FlumeUserGuide.rst, line 1191 > > <https://reviews.apache.org/r/50438/diff/2/?file=1638337#file1638337line1191> > > > > Can you please clarify this setting? It's explained in the function readMultilineEventPre() and readMultilineEventNext(). And you can see the effect in TestTaildirSource. Previous: If not matched, this line is not part of previous event when the buffer event is not null. Then create a new event with buffer event's message and put the current line into the cleared buffer event. Next: If not matched, this line is not part of next event. Then merge the current line into the buffer event and create a new event with the merged message. private Event readMultilineEventPre(LineResult line, boolean match, Pattern pattern) throws IOException { Event event = null; Matcher m = pattern.matcher(new String(line.line)); boolean find = m.find(); match = (find && match) || (!find && !match); byte[] lineBytes = toOriginBytes(line); if (match) { /** If matched, merge it to the buffer event. */ mergeEvent(line); } else { /** * If not matched, this line is not part of previous event when the buffer event is not null. * Then create a new event with buffer event's message and put the current line into the * cleared buffer event. */ if (bufferEvent != null) { event = EventBuilder.withBody(bufferEvent.getBody()); } bufferEvent = null; bufferEvent = EventBuilder.withBody(lineBytes); if (line.lineSepInclude) { bufferEvent.getHeaders().put("lineCount", "1"); } long now = System.currentTimeMillis(); bufferEvent.getHeaders().put(TimestampInterceptor.Constants.TIMESTAMP, Long.toString(now)); } return event; } private Event readMultilineEventNext(LineResult line, boolean match, Pattern pattern) throws IOException { Event event = null; Matcher m = pattern.matcher(new String(line.line)); boolean find = m.find(); match = (find && match) || (!find && !match); if (match) { /** If matched, merge it to the buffer event. */ mergeEvent(line); } else { /** * If not matched, this line is not part of next event. Then merge the current line into the * buffer event and create a new event with the merged message. */ mergeEvent(line); event = EventBuilder.withBody(bufferEvent.getBody()); bufferEvent = null; } return event; } > On 二月 28, 2017, 2:24 p.m., Balázs Donát Bessenyei wrote: > > flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java, > > line 115 > > <https://reviews.apache.org/r/50438/diff/2/?file=1638339#file1638339line115> > > > > Isn't maxBytes and maxLines missing here? No, maxBytes and maxLines logic is after readline() in readEvents(). Precisely the function should be called needFlushTimeoutEvent(). I will fix it ASAP. public List<Event> readEvents(int numEvents, boolean backoffWithoutNL, boolean addByteOffset) throws IOException { List<Event> events = Lists.newLinkedList(); if (this.multiline) { boolean match = this.multilinePatternMatched; Pattern pattern = Pattern.compile(this.multilinePattern); while (events.size() < numEvents) { LineResult line = readLine(); if (line == null) { break; } Event event = null; switch (this.multilinePatternBelong) { case "next": event = readMultilineEventNext(line, match, pattern); break; case "previous": event = readMultilineEventPre(line, match, pattern); break; default: break; } if (event != null) { events.add(event); } if (bufferEvent != null) { if (bufferEvent.getBody().length >= multilineMaxBytes || Integer.parseInt(bufferEvent.getHeaders().get("lineCount")) == multilineMaxLines) { flushBufferEvent(events); } } } if (isNeedFlushBufferEvent()) { flushBufferEvent(events); } } else { ... } - qiao ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/50438/#review167074 ----------------------------------------------------------- On 二月 17, 2017, 11:13 a.m., qiao wen wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/50438/ > ----------------------------------------------------------- > > (Updated 二月 17, 2017, 11:13 a.m.) > > > Review request for Flume. > > > Repository: flume-git > > > Description > ------- > > TaidirSource defaults to LINE, this has issue when multiline log events like > stack traces and have request/responses. Following part is Java traceback > logs. We expect to have log line start regex Key to aggregate all the log > lines till the next regex key is found. > 2016-07-16 14:59:43,956 ERROR lifecycleSupervisor-1-7 LifecycleSupervisor.run > - Unable to start EventDrivenSourceRunner: { > source:cn.yottabyte.flume.source.http.HTTPSource{name:sourceHttp,state:IDLE} > } - Exception follows. > java.lang.IllegalStateException: Running HTTP Server found in source: > sourceHttp before I started one. Will not attempt to start. > at com.google.common.base.Preconditions.checkState(Preconditions.java:145) > at > cn.yottabyte.flume.source.http.HTTPSource.startHttpSourceServer(HTTPSource.java:170) > at cn.yottabyte.flume.source.http.HTTPSource.start(HTTPSource.java:166) > at > org.apache.flume.source.EventDrivenSourceRunner.start(EventDrivenSourceRunner.java:44) > at > org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > > Diffs > ----- > > flume-ng-doc/sphinx/FlumeUserGuide.rst afa6625 > > flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java > 8838320 > > flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TailFile.java > 42474c4 > > flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java > a107a01 > > flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java > f2347f3 > > flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java > 097ee0b > > Diff: https://reviews.apache.org/r/50438/diff/ > > > Testing > ------- > > All tests in TestTaildirSource passed. > > > Thanks, > > qiao wen > >