Author: jarcec
Date: Tue Jul 24 02:08:04 2012
New Revision: 1364869

URL: http://svn.apache.org/viewvc?rev=1364869&view=rev
Log:
FLUME-1358. Add a regex-based filtering interceptor.

(Patrick Wendell via Jarek Jarcec Cecho)

Added:
    
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexFilteringInterceptor.java
    
flume/trunk/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexFilteringInterceptor.java
Modified:
    
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
    flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst

Modified: 
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
URL: 
http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java?rev=1364869&r1=1364868&r2=1364869&view=diff
==============================================================================
--- 
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
 (original)
+++ 
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
 Tue Jul 24 02:08:04 2012
@@ -22,7 +22,8 @@ public enum InterceptorType {
 
   TIMESTAMP(org.apache.flume.interceptor.TimestampInterceptor.Builder.class),
   HOST(org.apache.flume.interceptor.HostInterceptor.Builder.class),
-  STATIC(org.apache.flume.interceptor.StaticInterceptor.Builder.class)
+  STATIC(org.apache.flume.interceptor.StaticInterceptor.Builder.class),
+  
REGEX_FILTER(org.apache.flume.interceptor.RegexFilteringInterceptor.Builder.class)
   ;
 
   private final Class<? extends Interceptor.Builder> builderClass;

Added: 
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexFilteringInterceptor.java
URL: 
http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexFilteringInterceptor.java?rev=1364869&view=auto
==============================================================================
--- 
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexFilteringInterceptor.java
 (added)
+++ 
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexFilteringInterceptor.java
 Tue Jul 24 02:08:04 2012
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.interceptor;
+
+import static 
org.apache.flume.interceptor.RegexFilteringInterceptor.Constants.DEFAULT_EXCLUDE_EVENTS;
+import static 
org.apache.flume.interceptor.RegexFilteringInterceptor.Constants.DEFAULT_REGEX;
+import static 
org.apache.flume.interceptor.RegexFilteringInterceptor.Constants.EXCLUDE_EVENTS;
+import static 
org.apache.flume.interceptor.RegexFilteringInterceptor.Constants.REGEX;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Interceptor that filters events selectively based on a configured regular
+ * expression matching against the event body.
+ *
+ * This supports either include- or exclude-based filtering. A given
+ * interceptor can only perform one of these functions, but multiple
+ * interceptor can be chained together to create more complex
+ * inclusion/exclusion patterns. If include-based filtering is configured, then
+ * all events matching the supplied regular expression will be passed through
+ * and all events not matching will be ignored. If exclude-based filtering is
+ * configured, than all events matching will be ignored, and all other events
+ * will pass through.
+ *
+ * Note that all regular expression matching occurs through Java's built in
+ * java.util.regex package.
+ *
+ * Properties:<p>
+ *
+ *   regex: Regular expression for matching excluded events.
+ *          (default is ".*")<p>
+ *
+ *   excludeEvents: If true, a regex match determines events to exclude,
+ *                  otherwise a regex determines events to include
+ *                  (default is false)<p>
+ *
+ * Sample config:<p>
+ *
+ * <code>
+ *   agent.sources.r1.channels = c1<p>
+ *   agent.sources.r1.type = SEQ<p>
+ *   agent.sources.r1.interceptors = i1<p>
+ *   agent.sources.r1.interceptors.i1.type = REGEX<p>
+ *   agent.sources.r1.interceptors.i1.regex = (WARNING)|(ERROR)|(FATAL)<p>
+ * </code>
+ *
+ */
+public class RegexFilteringInterceptor implements Interceptor {
+
+  private static final Logger logger = LoggerFactory
+      .getLogger(StaticInterceptor.class);
+
+  private final Pattern regex;
+  private final boolean excludeEvents;
+
+  /**
+   * Only {@link RegexFilteringInterceptor.Builder} can build me
+   */
+  private RegexFilteringInterceptor(Pattern regex, boolean excludeEvents) {
+    this.regex = regex;
+    this.excludeEvents = excludeEvents;
+  }
+
+  @Override
+  public void initialize() {
+    // no-op
+  }
+
+
+  @Override
+  /**
+   * Returns the event if it passes the regular expression filter and null
+   * otherwise.
+   */
+  public Event intercept(Event event) {
+    // We've already ensured here that at most one of includeRegex and
+    // excludeRegex are defined.
+
+    if (!excludeEvents) {
+      if (regex.matcher(new String(event.getBody())).find()) {
+        return event;
+      }
+      else {
+        return null;
+      }
+    }
+    else {
+      if (regex.matcher(new String(event.getBody())).find()) {
+        return null;
+      }
+      else {
+        return event;
+      }
+    }
+  }
+
+  /**
+   * Returns the set of events which pass filters, according to
+   * {@link #intercept(Event)}.
+   * @param events
+   * @return
+   */
+  @Override
+  public List<Event> intercept(List<Event> events) {
+    List<Event> out = Lists.newArrayList();
+    for (Event event : events) {
+      Event outEvent = intercept(event);
+      if (outEvent != null) { out.add(outEvent); }
+    }
+    return out;
+  }
+
+  @Override
+  public void close() {
+    // no-op
+  }
+
+  /**
+   * Builder which builds new instance of the StaticInterceptor.
+   */
+  public static class Builder implements Interceptor.Builder {
+
+    private Pattern regex;
+    private boolean excludeEvents;
+
+    @Override
+    public void configure(Context context) {
+      String regexString = context.getString(REGEX, DEFAULT_REGEX);
+      regex = Pattern.compile(regexString);
+      excludeEvents = context.getBoolean(EXCLUDE_EVENTS,
+          DEFAULT_EXCLUDE_EVENTS);
+    }
+
+    @Override
+    public Interceptor build() {
+      logger.info(String.format(
+          "Creating RegexFilteringInterceptor: regex=%s,excludeEvents=%s",
+          regex, excludeEvents));
+      return new RegexFilteringInterceptor(regex, excludeEvents);
+    }
+  }
+
+  public static class Constants {
+
+    public static final String REGEX = "regex";
+    public static final String DEFAULT_REGEX = ".*";
+
+    public static final String EXCLUDE_EVENTS = "excludeEvents";
+    public static final boolean DEFAULT_EXCLUDE_EVENTS = false;
+  }
+
+}
\ No newline at end of file

Added: 
flume/trunk/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexFilteringInterceptor.java
URL: 
http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexFilteringInterceptor.java?rev=1364869&view=auto
==============================================================================
--- 
flume/trunk/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexFilteringInterceptor.java
 (added)
+++ 
flume/trunk/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexFilteringInterceptor.java
 Tue Jul 24 02:08:04 2012
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.interceptor;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.interceptor.RegexFilteringInterceptor.Constants;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+
+public class TestRegexFilteringInterceptor {
+
+  @Test
+  /** By default, we should pass through any event. */
+  public void testDefaultBehavior() throws ClassNotFoundException,
+      InstantiationException, IllegalAccessException {
+    Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
+        InterceptorType.REGEX_FILTER.toString());
+    builder.configure(new Context());
+    Interceptor interceptor = builder.build();
+
+    Event event = EventBuilder.withBody("test", Charsets.UTF_8);
+
+    Event filteredEvent = interceptor.intercept(event);
+    Assert.assertNotNull(filteredEvent);
+    Assert.assertEquals(event, filteredEvent);
+  }
+
+  @Test
+  public void testInclusion() throws ClassNotFoundException,
+      InstantiationException, IllegalAccessException {
+    Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
+        InterceptorType.REGEX_FILTER.toString());
+
+    Context ctx = new Context();
+    ctx.put(Constants.REGEX, "(INFO.*)|(WARNING.*)");
+    ctx.put(Constants.EXCLUDE_EVENTS, "false");
+
+    builder.configure(ctx);
+    Interceptor interceptor = builder.build();
+
+    Event shouldPass1 = EventBuilder.withBody("INFO: some message",
+        Charsets.UTF_8);
+    Assert.assertNotNull(interceptor.intercept(shouldPass1));
+
+    Event shouldPass2 = EventBuilder.withBody("WARNING: some message",
+        Charsets.UTF_8);
+    Assert.assertNotNull(interceptor.intercept(shouldPass2));
+
+    Event shouldNotPass = EventBuilder.withBody("DEBUG: some message",
+        Charsets.UTF_8);
+    Assert.assertNull(interceptor.intercept(shouldNotPass));
+
+    builder.configure(ctx);
+  }
+
+  @Test
+  public void testExclusion() throws ClassNotFoundException,
+      InstantiationException, IllegalAccessException {
+    Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
+        InterceptorType.REGEX_FILTER.toString());
+
+    Context ctx = new Context();
+    ctx.put(Constants.REGEX, ".*DEBUG.*");
+    ctx.put(Constants.EXCLUDE_EVENTS, "true");
+
+    builder.configure(ctx);
+    Interceptor interceptor = builder.build();
+
+    Event shouldPass1 = EventBuilder.withBody("INFO: some message",
+        Charsets.UTF_8);
+    Assert.assertNotNull(interceptor.intercept(shouldPass1));
+
+    Event shouldPass2 = EventBuilder.withBody("WARNING: some message",
+        Charsets.UTF_8);
+    Assert.assertNotNull(interceptor.intercept(shouldPass2));
+
+    Event shouldNotPass = EventBuilder.withBody("this message has DEBUG in it",
+        Charsets.UTF_8);
+    Assert.assertNull(interceptor.intercept(shouldNotPass));
+
+    builder.configure(ctx);
+  }
+}
\ No newline at end of file

Modified: flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst
URL: 
http://svn.apache.org/viewvc/flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst?rev=1364869&r1=1364868&r2=1364869&view=diff
==============================================================================
--- flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst (original)
+++ flume/trunk/flume-ng-doc/sphinx/FlumeUserGuide.rst Tue Jul 24 02:08:04 2012
@@ -1719,6 +1719,19 @@ Example for agent named **agent_foo**:
   agent_foo.sources.source1.interceptors.inter1.key = datacenter
   agent_foo.sources.source1.interceptors.inter1.value = NEW_YORK
 
+Regex Filtering Interceptor
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+This interceptor filters events selectively by interpreting the event body as 
text and matching the text against a configured regular expression. The 
supplied regular expression can be used to include events or exclude events.
+
+================  =======  
========================================================================
+Property Name     Default  Description
+================  =======  
========================================================================
+**type**          --       The component type name has to be ``REGEX_FILTER``
+regex             ".*"     Regular expression for matching against events
+excludeRegex      false    If true, regex determines events to exclude, 
otherwise regex determines events to include.
+================  =======  
========================================================================
+
 Flume Properties
 ----------------
 
@@ -1733,7 +1746,6 @@ flume.called.from.service  --       If t
                                     -Dflume.called.from.service is enough)
 =========================  =======  
====================================================================
 
-
 Property: flume.called.from.service
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 


Reply via email to