Author: hshreedharan
Date: Tue Jul 10 05:11:27 2012
New Revision: 1359481
URL: http://svn.apache.org/viewvc?rev=1359481&view=rev
Log:
FLUME-1276. Add a Static Header Interceptor.
(Patrick Wendell via Hari Shreedharan)
Added:
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/StaticInterceptor.java
(with props)
flume/trunk/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestStaticInterceptor.java
(with props)
Modified:
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
Modified:
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
URL:
http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java?rev=1359481&r1=1359480&r2=1359481&view=diff
==============================================================================
---
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
(original)
+++
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/InterceptorType.java
Tue Jul 10 05:11:27 2012
@@ -22,6 +22,7 @@ public enum InterceptorType {
TIMESTAMP(org.apache.flume.interceptor.TimestampInterceptor.Builder.class),
HOST(org.apache.flume.interceptor.HostInterceptor.Builder.class),
+ STATIC(org.apache.flume.interceptor.StaticInterceptor.Builder.class)
;
private final Class<? extends Interceptor.Builder> builderClass;
Added:
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/StaticInterceptor.java
URL:
http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/StaticInterceptor.java?rev=1359481&view=auto
==============================================================================
---
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/StaticInterceptor.java
(added)
+++
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/StaticInterceptor.java
Tue Jul 10 05:11:27 2012
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.interceptor;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flume.interceptor.StaticInterceptor.Constants.*;
+
+/**
+ * Interceptor class that appends a static, pre-configured header to all
events.
+ *
+ * Properties:<p>
+ *
+ * key: Key to use in static header insertion.
+ * (default is "key")<p>
+ *
+ * value: Value to use in static header insertion.
+ * (default is "value")<p>
+ *
+ * preserveExisting: Whether to preserve an existing value for 'key'
+ * (default is true)<p>
+ *
+ * Sample config:<p>
+ *
+ * <code>
+ * agent.sources.r1.channels = c1<p>
+ * agent.sources.r1.type = SEQ<p>
+ * agent.sources.r1.interceptors = i1<p>
+ * agent.sources.r1.interceptors.i1.type =
org.apache.flume.interceptor.StaticInterceptor$Builder<p>
+ * agent.sources.r1.interceptors.i1.preserveExisting = false<p>
+ * agent.sources.r1.interceptors.i1.key = datacenter<p>
+ * agent.sources.r1.interceptors.i1.value= NYC_01<p>
+ * </code>
+ *
+ */
+public class StaticInterceptor implements Interceptor {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(StaticInterceptor.class);
+
+ private final boolean preserveExisting;
+ private final String key;
+ private final String value;
+
+ /**
+ * Only {@link HostInterceptor.Builder} can build me
+ */
+ private StaticInterceptor(boolean preserveExisting, String key,
+ String value) {
+ this.preserveExisting = preserveExisting;
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public void initialize() {
+ // no-op
+ }
+
+ /**
+ * Modifies events in-place.
+ */
+ @Override
+ public Event intercept(Event event) {
+ Map<String, String> headers = event.getHeaders();
+
+ if (preserveExisting && headers.containsKey(key)) {
+ return event;
+ }
+
+ headers.put(key, value);
+ return event;
+ }
+
+ /**
+ * Delegates to {@link #intercept(Event)} in a loop.
+ * @param events
+ * @return
+ */
+ @Override
+ public List<Event> intercept(List<Event> events) {
+ for (Event event : events) {
+ intercept(event);
+ }
+ return events;
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ }
+
+ /**
+ * Builder which builds new instance of the StaticInterceptor.
+ */
+ public static class Builder implements Interceptor.Builder {
+
+ private boolean preserveExisting;
+ private String key;
+ private String value;
+
+ @Override
+ public void configure(Context context) {
+ preserveExisting = context.getBoolean(PRESERVE, PRESERVE_DEFAULT);
+ key = context.getString(KEY, KEY_DEFAULT);
+ value = context.getString(VALUE, VALUE_DEFAULT);
+ }
+
+ @Override
+ public Interceptor build() {
+ logger.info(String.format(
+ "Creating StaticInterceptor: preserveExisting=%s,key=%s,value=%s",
+ preserveExisting, key, value));
+ return new StaticInterceptor(preserveExisting, key, value);
+ }
+
+
+ }
+
+ public static class Constants {
+
+ public static final String KEY = "key";
+ public static final String KEY_DEFAULT = "key";
+
+ public static final String VALUE = "value";
+ public static final String VALUE_DEFAULT = "value";
+
+ public static final String PRESERVE = "preserveExisting";
+ public static final boolean PRESERVE_DEFAULT = true;
+ }
+}
Propchange:
flume/trunk/flume-ng-core/src/main/java/org/apache/flume/interceptor/StaticInterceptor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
flume/trunk/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestStaticInterceptor.java
URL:
http://svn.apache.org/viewvc/flume/trunk/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestStaticInterceptor.java?rev=1359481&view=auto
==============================================================================
---
flume/trunk/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestStaticInterceptor.java
(added)
+++
flume/trunk/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestStaticInterceptor.java
Tue Jul 10 05:11:27 2012
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.interceptor;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.interceptor.StaticInterceptor.Constants;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+
+public class TestStaticInterceptor {
+ @Test
+ public void testDefaultKeyValue() throws ClassNotFoundException,
+ InstantiationException, IllegalAccessException {
+ Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
+ InterceptorType.STATIC.toString());
+ builder.configure(new Context());
+ Interceptor interceptor = builder.build();
+
+ Event event = EventBuilder.withBody("test", Charsets.UTF_8);
+ Assert.assertNull(event.getHeaders().get(Constants.KEY));
+
+ event = interceptor.intercept(event);
+ String val = event.getHeaders().get(Constants.KEY);
+
+ Assert.assertNotNull(val);
+ Assert.assertEquals(Constants.VALUE, val);
+ }
+
+ @Test
+ public void testCustomKeyValue() throws ClassNotFoundException,
+ InstantiationException, IllegalAccessException {
+ Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
+ InterceptorType.STATIC.toString());
+ Context ctx = new Context();
+ ctx.put(Constants.KEY, "myKey");
+ ctx.put(Constants.VALUE, "myVal");
+
+ builder.configure(ctx);
+ Interceptor interceptor = builder.build();
+
+ Event event = EventBuilder.withBody("test", Charsets.UTF_8);
+ Assert.assertNull(event.getHeaders().get("myKey"));
+
+ event = interceptor.intercept(event);
+ String val = event.getHeaders().get("myKey");
+
+ Assert.assertNotNull(val);
+ Assert.assertEquals("myVal", val);
+ }
+
+ @Test
+ public void testReplace() throws ClassNotFoundException,
+ InstantiationException, IllegalAccessException {
+ Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
+ InterceptorType.STATIC.toString());
+ Context ctx = new Context();
+ ctx.put(Constants.PRESERVE, "false");
+ ctx.put(Constants.VALUE, "replacement value");
+
+ builder.configure(ctx);
+ Interceptor interceptor = builder.build();
+
+ Event event = EventBuilder.withBody("test", Charsets.UTF_8);
+ event.getHeaders().put(Constants.KEY, "incumbent value");
+
+ Assert.assertNotNull(event.getHeaders().get(Constants.KEY));
+
+ event = interceptor.intercept(event);
+ String val = event.getHeaders().get(Constants.KEY);
+
+ Assert.assertNotNull(val);
+ Assert.assertEquals("replacement value", val);
+ }
+
+ @Test
+ public void testPreserve() throws ClassNotFoundException,
+ InstantiationException, IllegalAccessException {
+ Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
+ InterceptorType.STATIC.toString());
+ Context ctx = new Context();
+ ctx.put(Constants.PRESERVE, "true");
+ ctx.put(Constants.VALUE, "replacement value");
+
+ builder.configure(ctx);
+ Interceptor interceptor = builder.build();
+
+ Event event = EventBuilder.withBody("test", Charsets.UTF_8);
+ event.getHeaders().put(Constants.KEY, "incumbent value");
+
+ Assert.assertNotNull(event.getHeaders().get(Constants.KEY));
+
+ event = interceptor.intercept(event);
+ String val = event.getHeaders().get(Constants.KEY);
+
+ Assert.assertNotNull(val);
+ Assert.assertEquals("incumbent value", val);
+ }
+}
\ No newline at end of file
Propchange:
flume/trunk/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestStaticInterceptor.java
------------------------------------------------------------------------------
svn:eol-style = native