Updated Branches: refs/heads/trunk 338630a68 -> d466e8cbc
FLUME-1657: Regex Extractor Interceptor (Cameron Gandevia via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d466e8cb Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d466e8cb Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d466e8cb Branch: refs/heads/trunk Commit: d466e8cbc3f3c5a26e700b55732b1c059d23eb95 Parents: 338630a Author: Brock Noland <[email protected]> Authored: Thu Nov 15 09:49:52 2012 -0600 Committer: Brock Noland <[email protected]> Committed: Thu Nov 15 09:49:52 2012 -0600 ---------------------------------------------------------------------- .../apache/flume/interceptor/InterceptorType.java | 5 +- .../interceptor/RegexExtractorInterceptor.java | 223 +++++++++++++++ .../RegexExtractorInterceptorMillisSerializer.java | 56 ++++ ...xExtractorInterceptorPassThroughSerializer.java | 44 +++ .../RegexExtractorInterceptorSerializer.java | 37 +++ .../interceptor/TestRegexExtractorInterceptor.java | 148 ++++++++++ ...tRegexExtractorInterceptorMillisSerializer.java | 61 ++++ ...xExtractorInterceptorPassThroughSerializer.java | 34 +++ flume-ng-doc/sphinx/FlumeUserGuide.rst | 47 +++ 9 files changed, 653 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/d466e8cb/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java index c478337..c84cea5 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java +++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java @@ -23,8 +23,9 @@ public enum InterceptorType { TIMESTAMP(org.apache.flume.interceptor.TimestampInterceptor.Builder.class), HOST(org.apache.flume.interceptor.HostInterceptor.Builder.class), STATIC(org.apache.flume.interceptor.StaticInterceptor.Builder.class), - REGEX_FILTER(org.apache.flume.interceptor.RegexFilteringInterceptor.Builder.class) - ; + REGEX_FILTER( + org.apache.flume.interceptor.RegexFilteringInterceptor.Builder.class), + REGEX_EXTRACTOR(org.apache.flume.interceptor.RegexExtractorInterceptor.Builder.class); private final Class<? extends Interceptor.Builder> builderClass; http://git-wip-us.apache.org/repos/asf/flume/blob/d466e8cb/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java new file mode 100644 index 0000000..d9c3762 --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptor.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.interceptor; + +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.lang.StringUtils; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; + +/** + * Interceptor that extracts matches using a specified regular expression and + * appends the matches to the event headers using the specified serializers</p> + * Note that all regular expression matching occurs through Java's built in + * java.util.regex package</p>. Properties: + * <p> + * regex: The regex to use + * <p> + * serializers: Comma separated list of headerName:fully qualified serializer + * class, if the serializer class is not specified the default + * {@link RegexExtractorInterceptorPassThroughSerializer} will be used + * <p> + * Sample config: + * <p> + * <code> + * agent.sources.r1.channels = c1<p> + * agent.sources.r1.type = SEQ<p> + * agent.sources.r1.interceptors = i1<p> + * agent.sources.r1.interceptors.i1.type = REGEX_EXTRACTOR<p> + * agent.sources.r1.interceptors.i1.regex = (WARNING)|(ERROR)|(FATAL)<p> + * agent.sources.r1.interceptors.i1.serializer = warning:com.blah.SomeSerializer,error,fatal:org.apache.flume.interceptor.RegexExtractorInterceptorTimestampSerializer<p> + * agent.sources.r1.interceptors.i1.org.apache.flume.interceptor.RegexExtractorInterceptorTimestampSerializer.dateFormat = yyyy-MM-dd + * </code> + * </p> + * <pre> + * Example 1: + * </p> + * EventBody: 1:2:3.4foobar5</p> Configuration: + * agent.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d) + * </p> + * agent.sources.r1.interceptors.i1.matchGroupIds = one,two,three + * </p> + * results in an event with the the following + * + * body: 1:2:3.4foobar5 headers: one=>1, two=>2, three=3 + * + * Example 2: + * + * EventBody: 1:2:3.4foobar5 + * + * Configuration: agent.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d) + * <p> + * agent.sources.r1.interceptors.i1.matchGroupIds = one,two + * <p> + * + * results in an event with the the following + * + * body: 1:2:3.4foobar5 headers: one=>1, two=>2 + * </pre> + */ +public class RegexExtractorInterceptor implements Interceptor { + + static final String REGEX = "regex"; + static final String SERIALIZERS = "serializers"; + + private static final Logger logger = LoggerFactory + .getLogger(RegexExtractorInterceptor.class); + + private final Pattern regex; + private final List<NameAndSerializer> serializers; + + private RegexExtractorInterceptor(Pattern regex, + List<NameAndSerializer> serializers) { + this.regex = regex; + this.serializers = serializers; + } + + @Override + public void initialize() { + // NO-OP... + } + + @Override + public void close() { + // NO-OP... + } + + @Override + public Event intercept(Event event) { + Matcher matcher = regex.matcher(new String(event.getBody())); + Map<String, String> headers = event.getHeaders(); + if (matcher.find()) { + for (int group = 0, count = matcher.groupCount(); group < count; group++) { + int groupIndex = group + 1; + if (groupIndex > serializers.size()) { + if (logger.isDebugEnabled()) { + logger.debug("Skipping group {} to {} due to missing serializer", + group, count); + } + break; + } + NameAndSerializer serializer = serializers.get(group); + if (logger.isDebugEnabled()) { + logger.debug("Serializing {} using {}", serializer.headerName, + serializer.serializer); + } + headers.put(serializer.headerName, + serializer.serializer.serialize(matcher.group(groupIndex))); + } + } + return event; + } + + @Override + public List<Event> intercept(List<Event> events) { + List<Event> intercepted = Lists.newArrayListWithCapacity(events.size()); + for (Event event : events) { + Event interceptedEvent = intercept(event); + if (interceptedEvent != null) { + intercepted.add(interceptedEvent); + } + } + return intercepted; + } + + public static class Builder implements Interceptor.Builder { + + private Pattern regex; + private List<NameAndSerializer> serializerList; + private final RegexExtractorInterceptorSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer(); + + @Override + public void configure(Context context) { + String regexString = context.getString(REGEX); + Preconditions.checkArgument(!StringUtils.isEmpty(regexString), + "Must supply a valid regex string"); + regex = Pattern.compile(regexString); + + configureSerializers(context); + } + + private void configureSerializers(Context context) { + String serializerString = context.getString(SERIALIZERS); + Preconditions.checkArgument(!StringUtils.isEmpty(serializerString), + "Must supply at least one name and serializer"); + + String[] nameAndSerializerList = serializerString.split(","); + serializerList = Lists + .newArrayListWithCapacity(nameAndSerializerList.length); + for (String nameAndSerializer : nameAndSerializerList) { + String[] splitNameAndSerializer = nameAndSerializer.split(":"); + String name = splitNameAndSerializer[0].trim(); + if (splitNameAndSerializer.length > 1) { + String serializer = splitNameAndSerializer[1].trim(); + serializerList.add(new NameAndSerializer(name, getCustomSerializer( + serializer, context))); + } else { + serializerList.add(new NameAndSerializer(name, defaultSerializer)); + } + } + } + + private RegexExtractorInterceptorSerializer getCustomSerializer( + String clazzName, Context context) { + try { + Context serializerContext = new Context(); + serializerContext.putAll(context.getSubProperties(clazzName + ".")); + RegexExtractorInterceptorSerializer serializer = (RegexExtractorInterceptorSerializer) Class + .forName(clazzName).newInstance(); + serializer.configure(serializerContext); + return serializer; + } catch (Exception e) { + logger.error("Could not instantiate event serializer.", e); + Throwables.propagate(e); + } + return defaultSerializer; + } + + @Override + public Interceptor build() { + Preconditions.checkArgument(regex != null, + "Regex pattern was misconfigured"); + Preconditions.checkArgument(serializerList.size() > 0, + "Must supply a valid group match id list"); + return new RegexExtractorInterceptor(regex, serializerList); + } + } + + static class NameAndSerializer { + private final String headerName; + private final RegexExtractorInterceptorSerializer serializer; + + public NameAndSerializer(String headerName, + RegexExtractorInterceptorSerializer serializer) { + this.headerName = headerName; + this.serializer = serializer; + } + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/d466e8cb/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorMillisSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorMillisSerializer.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorMillisSerializer.java new file mode 100644 index 0000000..83bf0c9 --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorMillisSerializer.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.interceptor; + +import org.apache.commons.lang.StringUtils; +import org.apache.flume.Context; +import org.apache.flume.conf.ComponentConfiguration; +import org.joda.time.DateTime; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; + +import com.google.common.base.Preconditions; + +/** + * Serializer that converts the passed in value into milliseconds using the + * specified formatting pattern + */ +public class RegexExtractorInterceptorMillisSerializer implements + RegexExtractorInterceptorSerializer { + + private DateTimeFormatter formatter; + + @Override + public void configure(Context context) { + String pattern = context.getString("pattern"); + Preconditions.checkArgument(!StringUtils.isEmpty(pattern), + "Must configure with a valid pattern"); + formatter = DateTimeFormat.forPattern(pattern); + } + + @Override + public String serialize(String value) { + DateTime dateTime = formatter.parseDateTime(value); + return Long.toString(dateTime.getMillis()); + } + + @Override + public void configure(ComponentConfiguration conf) { + // NO-OP... + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/d466e8cb/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorPassThroughSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorPassThroughSerializer.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorPassThroughSerializer.java new file mode 100644 index 0000000..cecf631 --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorPassThroughSerializer.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.interceptor; + +import org.apache.flume.Context; +import org.apache.flume.conf.ComponentConfiguration; + +/** + * Serializer that simply returns the passed in value + */ +public class RegexExtractorInterceptorPassThroughSerializer implements + RegexExtractorInterceptorSerializer { + + @Override + public String serialize(String value) { + return value; + } + + @Override + public void configure(Context context) { + // NO-OP... + } + + @Override + public void configure(ComponentConfiguration conf) { + // NO-OP... + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/d466e8cb/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorSerializer.java b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorSerializer.java new file mode 100644 index 0000000..6cca098 --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/interceptor/RegexExtractorInterceptorSerializer.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.interceptor; + +import org.apache.flume.conf.Configurable; +import org.apache.flume.conf.ConfigurableComponent; + +/** + * Serializer for serializing groups matched by the + * {@link RegexExtractorInterceptor} + */ +public interface RegexExtractorInterceptorSerializer extends Configurable, + ConfigurableComponent { + + /** + * @param value + * The value extracted by the {@link RegexExtractorInterceptor} + * @return The serialized version of the specified value + */ + String serialize(String value); + +} http://git-wip-us.apache.org/repos/asf/flume/blob/d466e8cb/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptor.java b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptor.java new file mode 100644 index 0000000..e03e9e2 --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptor.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.interceptor; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.event.EventBuilder; +import org.apache.flume.interceptor.Interceptor.Builder; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.base.Charsets; + +public class TestRegexExtractorInterceptor { + + private Builder fixtureBuilder; + + @Before + public void init() throws Exception { + fixtureBuilder = InterceptorBuilderFactory + .newInstance(InterceptorType.REGEX_EXTRACTOR.toString()); + } + + @Test + public void shouldNotAllowConfigurationWithoutRegex() throws Exception { + try { + fixtureBuilder.build(); + Assert.fail(); + } catch (IllegalArgumentException ex) { + // Pass... + } + } + + @Test + public void shouldNotAllowConfigurationWithIllegalRegex() throws Exception { + try { + Context context = new Context(); + context.put(RegexExtractorInterceptor.REGEX, "?&?&&&?&?&?&&&??"); + fixtureBuilder.configure(context); + fixtureBuilder.build(); + Assert.fail(); + } catch (IllegalArgumentException ex) { + // Pass... + } + } + + @Test + public void shouldNotAllowConfigurationWithoutMatchIds() throws Exception { + try { + Context context = new Context(); + context.put(RegexExtractorInterceptor.REGEX, ".*"); + context.put(RegexExtractorInterceptor.SERIALIZERS, ""); + fixtureBuilder.configure(context); + fixtureBuilder.build(); + Assert.fail(); + } catch (IllegalArgumentException ex) { + // Pass... + } + } + + @Test + public void shouldExtractAddHeadersForAllMatchGroups() throws Exception { + Context context = new Context(); + context.put(RegexExtractorInterceptor.REGEX, "(\\d):(\\d):(\\d)"); + context.put(RegexExtractorInterceptor.SERIALIZERS, "Num1,Num2,Num3"); + fixtureBuilder.configure(context); + Interceptor fixture = fixtureBuilder.build(); + + Event event = EventBuilder.withBody("1:2:3.4foobar5", Charsets.UTF_8); + + Event expected = EventBuilder.withBody("1:2:3.4foobar5", Charsets.UTF_8); + expected.getHeaders().put("Num1", "1"); + expected.getHeaders().put("Num2", "2"); + expected.getHeaders().put("Num3", "3"); + + Event actual = fixture.intercept(event); + + Assert.assertArrayEquals(expected.getBody(), actual.getBody()); + Assert.assertEquals(expected.getHeaders(), actual.getHeaders()); + } + + @Test + public void shouldExtractAddHeadersForAllMatchGroupsIgnoringMissingIds() + throws Exception { + String body = "2012-10-17 14:34:44,338"; + Context context = new Context(); + // Skip the second group + context.put(RegexExtractorInterceptor.REGEX, + "^(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)(:\\d\\d,\\d\\d\\d)"); + context.put(RegexExtractorInterceptor.SERIALIZERS, "timestamp"); + + fixtureBuilder.configure(context); + Interceptor fixture = fixtureBuilder.build(); + + Event event = EventBuilder.withBody(body, Charsets.UTF_8); + Event expected = EventBuilder.withBody(body, Charsets.UTF_8); + expected.getHeaders().put("timestamp", "2012-10-17 14:34"); + + Event actual = fixture.intercept(event); + + Assert.assertArrayEquals(expected.getBody(), actual.getBody()); + Assert.assertEquals(expected.getHeaders(), actual.getHeaders()); + + } + + @Test + public void shouldExtractAddHeadersUsingSpecifiedSerializer() + throws Exception { + String body = "2012-10-17 14:34:44,338"; + Context context = new Context(); + // Skip the second group + context.put(RegexExtractorInterceptor.REGEX, + "^(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d)(:\\d\\d,\\d\\d\\d)"); + context.put(RegexExtractorInterceptor.SERIALIZERS, "timestamp:" + + RegexExtractorInterceptorMillisSerializer.class.getName() + ",data"); + context.put(RegexExtractorInterceptorMillisSerializer.class.getName() + + ".pattern", "yyyy-MM-dd HH:mm"); + + fixtureBuilder.configure(context); + Interceptor fixture = fixtureBuilder.build(); + + Event event = EventBuilder.withBody(body, Charsets.UTF_8); + Event expected = EventBuilder.withBody(body, Charsets.UTF_8); + expected.getHeaders().put("timestamp", "1350509640000"); + expected.getHeaders().put("data", ":44,338"); + + Event actual = fixture.intercept(event); + + Assert.assertArrayEquals(expected.getBody(), actual.getBody()); + Assert.assertEquals(expected.getHeaders(), actual.getHeaders()); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/d466e8cb/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java new file mode 100644 index 0000000..1f87d9a --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.interceptor; + +import junit.framework.Assert; + +import org.apache.flume.Context; +import org.junit.Test; + +public class TestRegexExtractorInterceptorMillisSerializer { + + @Test + public void shouldRequirePatternInConfiguration() { + try { + RegexExtractorInterceptorMillisSerializer fixture = new RegexExtractorInterceptorMillisSerializer(); + fixture.configure(new Context()); + Assert.fail(); + } catch (IllegalArgumentException ex) { + // Expected... + } + } + + @Test + public void shouldRequireValidPatternInConfiguration() { + try { + RegexExtractorInterceptorMillisSerializer fixture = new RegexExtractorInterceptorMillisSerializer(); + Context context = new Context(); + context.put("pattern", "ABCDEFG"); + fixture.configure(context); + Assert.fail(); + } catch (IllegalArgumentException ex) { + // Expected... + } + } + + @Test + public void shouldReturnMillisFromPattern() { + RegexExtractorInterceptorMillisSerializer fixture = new RegexExtractorInterceptorMillisSerializer(); + Context context = new Context(); + context.put("pattern", "yyyy-MM-dd HH:mm:ss"); + fixture.configure(context); + + Assert.assertEquals("1269616953000", + fixture.serialize("2010-03-26 08:22:33")); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/d466e8cb/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java new file mode 100644 index 0000000..569c274 --- /dev/null +++ b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.interceptor; + +import junit.framework.Assert; + +import org.apache.flume.Context; +import org.junit.Test; + +public class TestRegexExtractorInterceptorPassThroughSerializer { + + @Test + public void shouldReturnSameValue() { + RegexExtractorInterceptorPassThroughSerializer fixture = new RegexExtractorInterceptorPassThroughSerializer(); + fixture.configure(new Context()); + String input = "testing (1,2,3,4)"; + Assert.assertEquals(input, fixture.serialize(input)); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/d466e8cb/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 6670aa8..e0f1d19 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -2095,6 +2095,53 @@ regex ".*" Regular expression for matching against events excludeEvents false If true, regex determines events to exclude, otherwise regex determines events to include. ================ ======= ======================================================================== +Regex Extractor Interceptor +~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +This interceptor extracts regex match groups using a specified regular expression and appends the match groups as headers on the event. It also supports pluggable serializers for formatting the match groups before adding them as event headers. + +================ ============================== ================================================================================ +Property Name Default Description +================ ============================== ================================================================================ +**type** -- The component type name has to be ``REGEX_EXTRACTOR`` +**regex** -- Regular expression for matching against events +**serializer** -- Comma separated list of header name colon serializer. (See example below) + The following are support serializers out of the box + ``org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer`` + ``org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer`` +================ ============================== ================================================================================ + +The serializers are used to map the matches to a header name and a formatted header value, by default you only need to specify +the header name and the default ``org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer`` will be used. +This serializer simply maps the matches to the specified header name and passes the value through as it was extracted by the regex. +You can plug custom serializer implementations into the extractor using the fully qualified class name (FQCN) to format the matches +in anyway you like. + +Example 1: +~~~~~~~~~~ + +If the Flume event body contained ``1:2:3.4foobar5`` and the following configuration was used + +.. code-block:: properties + + agent.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d) + agent.sources.r1.interceptors.i1.serializer = one,two,three + +the extracted event will contain the same body but the following headers will have been added ``one=>1, two=>2, three=>3`` + +Example 2: +~~~~~~~~~~ + +If the Flume event body contained ``2012-10-18 18:47:57,614 some log line`` and the following configuration was used + +.. code-block:: properties + + agent.sources.r1.interceptors.i1.regex = ^(?:\\n)?(\\d\\d\\d\\d-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d) + agent.sources.r1.interceptors.i1.serializer = timestamp:org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer + agent.sources.r1.interceptors.i1.serializer.org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer.pattern = yyyy-MM-dd HH:mm + +the extracted event will contain the same body but the following headers will have been added ``timestamp=>1350611220000`` + Flume Properties ----------------
