Author: jarcec
Date: Tue Jul 24 02:08:04 2012
New Revision: 1364869
URL: http://svn.apache.org/viewvc?rev=1364869&view=rev
Log:
FLUME-1358. Add a regex-based filtering interceptor.
(Patrick Wendell via Jarek Jarcec Cecho)
Added:
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexFilteringInterceptor.java
flume/trunk/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexFilteringInterceptor.java
Modified:
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst
Modified:
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
URL:
http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java?rev=1364869&r1=1364868&r2=1364869&view=diff
==============================================================================
---
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
(original)
+++
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
Tue Jul 24 02:08:04 2012
@@ -22,7 +22,8 @@ public enum InterceptorType {
TIMESTAMP(org.apache.flume.interceptor.TimestampInterceptor.Builder.class),
HOST(org.apache.flume.interceptor.HostInterceptor.Builder.class),
- STATIC(org.apache.flume.interceptor.StaticInterceptor.Builder.class)
+ STATIC(org.apache.flume.interceptor.StaticInterceptor.Builder.class),
+
REGEX_FILTER(org.apache.flume.interceptor.RegexFilteringInterceptor.Builder.class)
;
private final Class<? extends Interceptor.Builder> builderClass;
Added:
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexFilteringInterceptor.java
URL:
http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexFilteringInterceptor.java?rev=1364869&view=auto
==============================================================================
---
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexFilteringInterceptor.java
(added)
+++
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexFilteringInterceptor.java
Tue Jul 24 02:08:04 2012
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.interceptor;
+
+import static
org.apache.flume.interceptor.RegexFilteringInterceptor.Constants.DEFAULT_EXCLUDE_EVENTS;
+import static
org.apache.flume.interceptor.RegexFilteringInterceptor.Constants.DEFAULT_REGEX;
+import static
org.apache.flume.interceptor.RegexFilteringInterceptor.Constants.EXCLUDE_EVENTS;
+import static
org.apache.flume.interceptor.RegexFilteringInterceptor.Constants.REGEX;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Interceptor that filters events selectively based on a configured regular
+ * expression matching against the event body.
+ *
+ * This supports either include- or exclude-based filtering. A given
+ * interceptor can only perform one of these functions, but multiple
+ * interceptor can be chained together to create more complex
+ * inclusion/exclusion patterns. If include-based filtering is configured, then
+ * all events matching the supplied regular expression will be passed through
+ * and all events not matching will be ignored. If exclude-based filtering is
+ * configured, than all events matching will be ignored, and all other events
+ * will pass through.
+ *
+ * Note that all regular expression matching occurs through Java's built in
+ * java.util.regex package.
+ *
+ * Properties:<p>
+ *
+ * regex: Regular expression for matching excluded events.
+ * (default is ".*")<p>
+ *
+ * excludeEvents: If true, a regex match determines events to exclude,
+ * otherwise a regex determines events to include
+ * (default is false)<p>
+ *
+ * Sample config:<p>
+ *
+ * <code>
+ * agent.sources.r1.channels = c1<p>
+ * agent.sources.r1.type = SEQ<p>
+ * agent.sources.r1.interceptors = i1<p>
+ * agent.sources.r1.interceptors.i1.type = REGEX<p>
+ * agent.sources.r1.interceptors.i1.regex = (WARNING)|(ERROR)|(FATAL)<p>
+ * </code>
+ *
+ */
+public class RegexFilteringInterceptor implements Interceptor {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(StaticInterceptor.class);
+
+ private final Pattern regex;
+ private final boolean excludeEvents;
+
+ /**
+ * Only {@link RegexFilteringInterceptor.Builder} can build me
+ */
+ private RegexFilteringInterceptor(Pattern regex, boolean excludeEvents) {
+ this.regex = regex;
+ this.excludeEvents = excludeEvents;
+ }
+
+ @Override
+ public void initialize() {
+ // no-op
+ }
+
+
+ @Override
+ /**
+ * Returns the event if it passes the regular expression filter and null
+ * otherwise.
+ */
+ public Event intercept(Event event) {
+ // We've already ensured here that at most one of includeRegex and
+ // excludeRegex are defined.
+
+ if (!excludeEvents) {
+ if (regex.matcher(new String(event.getBody())).find()) {
+ return event;
+ }
+ else {
+ return null;
+ }
+ }
+ else {
+ if (regex.matcher(new String(event.getBody())).find()) {
+ return null;
+ }
+ else {
+ return event;
+ }
+ }
+ }
+
+ /**
+ * Returns the set of events which pass filters, according to
+ * {@link #intercept(Event)}.
+ * @param events
+ * @return
+ */
+ @Override
+ public List<Event> intercept(List<Event> events) {
+ List<Event> out = Lists.newArrayList();
+ for (Event event : events) {
+ Event outEvent = intercept(event);
+ if (outEvent != null) { out.add(outEvent); }
+ }
+ return out;
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ }
+
+ /**
+ * Builder which builds new instance of the StaticInterceptor.
+ */
+ public static class Builder implements Interceptor.Builder {
+
+ private Pattern regex;
+ private boolean excludeEvents;
+
+ @Override
+ public void configure(Context context) {
+ String regexString = context.getString(REGEX, DEFAULT_REGEX);
+ regex = Pattern.compile(regexString);
+ excludeEvents = context.getBoolean(EXCLUDE_EVENTS,
+ DEFAULT_EXCLUDE_EVENTS);
+ }
+
+ @Override
+ public Interceptor build() {
+ logger.info(String.format(
+ "Creating RegexFilteringInterceptor: regex=%s,excludeEvents=%s",
+ regex, excludeEvents));
+ return new RegexFilteringInterceptor(regex, excludeEvents);
+ }
+ }
+
+ public static class Constants {
+
+ public static final String REGEX = "regex";
+ public static final String DEFAULT_REGEX = ".*";
+
+ public static final String EXCLUDE_EVENTS = "excludeEvents";
+ public static final boolean DEFAULT_EXCLUDE_EVENTS = false;
+ }
+
+}
\ No newline at end of file
Added:
flume/trunk/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexFilteringInterceptor.java
URL:
http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexFilteringInterceptor.java?rev=1364869&view=auto
==============================================================================
---
flume/trunk/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexFilteringInterceptor.java
(added)
+++
flume/trunk/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexFilteringInterceptor.java
Tue Jul 24 02:08:04 2012
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.interceptor;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.interceptor.RegexFilteringInterceptor.Constants;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+
+public class TestRegexFilteringInterceptor {
+
+ @Test
+ /** By default, we should pass through any event. */
+ public void testDefaultBehavior() throws ClassNotFoundException,
+ InstantiationException, IllegalAccessException {
+ Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
+ InterceptorType.REGEX_FILTER.toString());
+ builder.configure(new Context());
+ Interceptor interceptor = builder.build();
+
+ Event event = EventBuilder.withBody("test", Charsets.UTF_8);
+
+ Event filteredEvent = interceptor.intercept(event);
+ Assert.assertNotNull(filteredEvent);
+ Assert.assertEquals(event, filteredEvent);
+ }
+
+ @Test
+ public void testInclusion() throws ClassNotFoundException,
+ InstantiationException, IllegalAccessException {
+ Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
+ InterceptorType.REGEX_FILTER.toString());
+
+ Context ctx = new Context();
+ ctx.put(Constants.REGEX, "(INFO.*)|(WARNING.*)");
+ ctx.put(Constants.EXCLUDE_EVENTS, "false");
+
+ builder.configure(ctx);
+ Interceptor interceptor = builder.build();
+
+ Event shouldPass1 = EventBuilder.withBody("INFO: some message",
+ Charsets.UTF_8);
+ Assert.assertNotNull(interceptor.intercept(shouldPass1));
+
+ Event shouldPass2 = EventBuilder.withBody("WARNING: some message",
+ Charsets.UTF_8);
+ Assert.assertNotNull(interceptor.intercept(shouldPass2));
+
+ Event shouldNotPass = EventBuilder.withBody("DEBUG: some message",
+ Charsets.UTF_8);
+ Assert.assertNull(interceptor.intercept(shouldNotPass));
+
+ builder.configure(ctx);
+ }
+
+ @Test
+ public void testExclusion() throws ClassNotFoundException,
+ InstantiationException, IllegalAccessException {
+ Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
+ InterceptorType.REGEX_FILTER.toString());
+
+ Context ctx = new Context();
+ ctx.put(Constants.REGEX, ".*DEBUG.*");
+ ctx.put(Constants.EXCLUDE_EVENTS, "true");
+
+ builder.configure(ctx);
+ Interceptor interceptor = builder.build();
+
+ Event shouldPass1 = EventBuilder.withBody("INFO: some message",
+ Charsets.UTF_8);
+ Assert.assertNotNull(interceptor.intercept(shouldPass1));
+
+ Event shouldPass2 = EventBuilder.withBody("WARNING: some message",
+ Charsets.UTF_8);
+ Assert.assertNotNull(interceptor.intercept(shouldPass2));
+
+ Event shouldNotPass = EventBuilder.withBody("this message has DEBUG in it",
+ Charsets.UTF_8);
+ Assert.assertNull(interceptor.intercept(shouldNotPass));
+
+ builder.configure(ctx);
+ }
+}
\ No newline at end of file
Modified: flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst
URL:
http://svn.apache.org/viewvc/flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst?rev=1364869&r1=1364868&r2=1364869&view=diff
==============================================================================
--- flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst (original)
+++ flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst Tue Jul 24 02:08:04 2012
@@ -1719,6 +1719,19 @@ Example for agent named **agent_foo**:
agent_foo.sources.source1.interceptors.inter1.key = datacenter
agent_foo.sources.source1.interceptors.inter1.value = NEW_YORK
+Regex Filtering Interceptor
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+This interceptor filters events selectively by interpreting the event body as
text and matching the text against a configured regular expression. The
supplied regular expression can be used to include events or exclude events.
+
+================ =======
========================================================================
+Property Name Default Description
+================ =======
========================================================================
+**type** -- The component type name has to be ``REGEX_FILTER``
+regex ".*" Regular expression for matching against events
+excludeRegex false If true, regex determines events to exclude,
otherwise regex determines events to include.
+================ =======
========================================================================
+
Flume Properties
----------------
@@ -1733,7 +1746,6 @@ flume.called.from.service -- If t
-Dflume.called.from.service is enough)
========================= =======
====================================================================
-
Property: flume.called.from.service
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~