Repository: flume
Updated Branches:
  refs/heads/trunk 33cdcf0d4 -> 09472ba12


FLUME-2273 - Add handling for header substitution in ElasticSearchSink

Satoshi Iijima via Juhani Connolly


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/09472ba1
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/09472ba1
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/09472ba1

Branch: refs/heads/trunk
Commit: 09472ba12278a0d3696b9d2e26d6d1b0d361c830
Parents: 33cdcf0
Author: Juhani Connolly <[email protected]>
Authored: Tue Jun 3 11:33:54 2014 +0900
Committer: Juhani Connolly <[email protected]>
Committed: Tue Jun 3 11:33:54 2014 +0900

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  6 ++++++
 ...ElasticSearchIndexRequestBuilderFactory.java |  9 +++++++--
 .../sink/elasticsearch/ElasticSearchSink.java   |  6 +++---
 .../elasticsearch/SimpleIndexNameBuilder.java   |  5 +++--
 .../TimeBasedIndexNameBuilder.java              |  6 ++++--
 .../client/ElasticSearchRestClient.java         |  2 --
 .../client/ElasticSearchTransportClient.java    |  1 -
 ...ElasticSearchIndexRequestBuilderFactory.java | 21 ++++++++++++++++++++
 8 files changed, 44 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/09472ba1/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 7732c13..040fc8b 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2008,7 +2008,9 @@ Property Name     Default
 **type**          --                                                           
            The component type name, needs to be 
``org.apache.flume.sink.elasticsearch.ElasticSearchSink``
 **hostNames**     --                                                           
            Comma separated list of hostname:port, if the port is not present 
the default port '9300' will be used
 indexName         flume                                                        
            The name of the index which the date will be appended to. Example 
'flume' -> 'flume-yyyy-MM-dd'
+                                                                               
            Arbitrary header substitution is supported, eg. %{header} replaces 
with value of named event header
 indexType         logs                                                         
            The type to index the document to, defaults to 'log'
+                                                                               
            Arbitrary header substitution is supported, eg. %{header} replaces 
with value of named event header
 clusterName       elasticsearch                                                
            Name of the ElasticSearch cluster to connect to
 batchSize         100                                                          
            Number of events to be written per txn.
 ttl               --                                                           
            TTL in days, when set will cause the expired documents to be 
deleted automatically,
@@ -2021,6 +2023,10 @@ serializer        
org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEvent
 serializer.*      --                                                           
            Properties to be passed to the serializer.
 ================  
======================================================================== 
=======================================================================================================
 
+.. note:: Header substitution is a handy to use the value of an event header 
to dynamically decide the indexName and indexType to use when storing the event.
+          Caution should be used in using this feature as the event submitter 
now has control of the indexName and indexType.
+          Furthermore, if the elasticsearch REST client is used then the event 
submitter has control of the URL path used.
+
 Example for agent named a1:
 
 .. code-block:: properties

http://git-wip-us.apache.org/repos/asf/flume/blob/09472ba1/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
index de84b95..9996142 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
@@ -26,6 +26,7 @@ import org.apache.flume.Event;
 import org.apache.flume.conf.ComponentConfiguration;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.conf.ConfigurableComponent;
+import org.apache.flume.formatter.output.BucketPath;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.client.Client;
 
@@ -75,10 +76,14 @@ public abstract class 
AbstractElasticSearchIndexRequestBuilderFactory
   public IndexRequestBuilder createIndexRequest(Client client,
         String indexPrefix, String indexType, Event event) throws IOException {
     IndexRequestBuilder request = prepareIndex(client);
+    String realIndexPrefix = BucketPath.escapeString(indexPrefix, 
event.getHeaders());
+    String realIndexType = BucketPath.escapeString(indexType, 
event.getHeaders());
+
     TimestampedEvent timestampedEvent = new TimestampedEvent(event);
     long timestamp = timestampedEvent.getTimestamp();
-    String indexName = getIndexName(indexPrefix, timestamp);
-    prepareIndexRequest(request, indexName, indexType, timestampedEvent);
+
+    String indexName = getIndexName(realIndexPrefix, timestamp);
+    prepareIndexRequest(request, indexName, realIndexType, timestampedEvent);
     return request;
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/09472ba1/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
index 39b6db5..1d9dfce 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
@@ -38,6 +38,7 @@ import org.apache.flume.CounterGroup;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
+import org.apache.flume.formatter.output.BucketPath;
 import org.apache.flume.conf.Configurable;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
@@ -61,8 +62,6 @@ import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEF
 import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME_BUILDER;
 import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME_BUILDER_PREFIX;
 
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-
 /**
  * A sink which reads events from a channel and writes them to ElasticSearch
  * based on the work done by https://github.com/Aconex/elasticflume.git.</p>
@@ -186,7 +185,8 @@ public class ElasticSearchSink extends AbstractSink 
implements Configurable {
         if (event == null) {
           break;
         }
-        client.addEvent(event, indexNameBuilder, indexType, ttlMs);
+        String realIndexType = BucketPath.escapeString(indexType, 
event.getHeaders());
+        client.addEvent(event, indexNameBuilder, realIndexType, ttlMs);
       }
 
       if (count <= 0) {

http://git-wip-us.apache.org/repos/asf/flume/blob/09472ba1/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java
index 19079af..801cac9 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java
@@ -19,6 +19,7 @@ package org.apache.flume.sink.elasticsearch;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.formatter.output.BucketPath;
 
 public class SimpleIndexNameBuilder implements IndexNameBuilder {
 
@@ -26,12 +27,12 @@ public class SimpleIndexNameBuilder implements 
IndexNameBuilder {
 
   @Override
   public String getIndexName(Event event) {
-    return indexName;
+    return BucketPath.escapeString(indexName, event.getHeaders());
   }
 
   @Override
   public String getIndexPrefix(Event event) {
-    return indexName;
+    return BucketPath.escapeString(indexName, event.getHeaders());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/flume/blob/09472ba1/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java
index a8603a4..c651732 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang.time.FastDateFormat;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.flume.formatter.output.BucketPath;
 
 import java.util.TimeZone;
 
@@ -60,13 +61,14 @@ public class TimeBasedIndexNameBuilder implements
   public String getIndexName(Event event) {
     TimestampedEvent timestampedEvent = new TimestampedEvent(event);
     long timestamp = timestampedEvent.getTimestamp();
-    return new StringBuilder(indexPrefix).append('-')
+    String realIndexPrefix = BucketPath.escapeString(indexPrefix, 
event.getHeaders());
+    return new StringBuilder(realIndexPrefix).append('-')
       .append(fastDateFormat.format(timestamp)).toString();
   }
   
   @Override
   public String getIndexPrefix(Event event) {
-    return indexPrefix;
+    return BucketPath.escapeString(indexPrefix, event.getHeaders());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/flume/blob/09472ba1/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java
index ff95e30..0d1c37f 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java
@@ -116,8 +116,6 @@ public class  ElasticSearchRestClient implements 
ElasticSearchClient {
   public void execute() throws Exception {
     int statusCode = 0, triesCount = 0;
     HttpResponse response = null;
-    logger.info("Sending bulk request to elasticsearch cluster");
-
     String entity;
     synchronized (bulkBuilder) {
       entity = bulkBuilder.toString();

http://git-wip-us.apache.org/repos/asf/flume/blob/09472ba1/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java
index e9ed0b4..d44c8ad 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java
@@ -178,7 +178,6 @@ public class ElasticSearchTransportClient implements 
ElasticSearchClient {
   @Override
   public void execute() throws Exception {
     try {
-      logger.info("Sending bulk to elasticsearch cluster");
       BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
       if (bulkResponse.hasFailures()) {
         throw new EventDeliveryException(bulkResponse.buildFailureMessage());

http://git-wip-us.apache.org/repos/asf/flume/blob/09472ba1/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
index 807a9c7..8022111 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java
@@ -158,6 +158,27 @@ public class TestElasticSearchIndexRequestBuilderFactory
   }
 
   @Test
+  public void shouldSetIndexNameTypeFromHeaderWhenPresent()
+      throws Exception {
+    String indexPrefix = "%{index-name}";
+    String indexType = "%{index-type}";
+    String indexValue = "testing-index-name-from-headers";
+    String typeValue = "testing-index-type-from-headers";
+
+    Event event = new SimpleEvent();
+    event.getHeaders().put("index-name", indexValue);
+    event.getHeaders().put("index-type", typeValue);
+
+    IndexRequestBuilder indexRequestBuilder = factory.createIndexRequest(
+        null, indexPrefix, indexType, event);
+
+    assertEquals(indexValue + '-'
+        + ElasticSearchIndexRequestBuilderFactory.df.format(FIXED_TIME_MILLIS),
+      indexRequestBuilder.request().index());
+    assertEquals(typeValue, indexRequestBuilder.request().type());
+  }
+
+  @Test
   public void shouldConfigureEventSerializer() throws Exception {
     assertFalse(serializer.configuredWithContext);
     factory.configure(new Context());

Reply via email to