Repository: flume Updated Branches: refs/heads/trunk 33cdcf0d4 -> 09472ba12
FLUME-2273 - Add handling for header substitution in ElasticSearchSink Satoshi Iijima via Juhani Connolly Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/09472ba1 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/09472ba1 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/09472ba1 Branch: refs/heads/trunk Commit: 09472ba12278a0d3696b9d2e26d6d1b0d361c830 Parents: 33cdcf0 Author: Juhani Connolly <[email protected]> Authored: Tue Jun 3 11:33:54 2014 +0900 Committer: Juhani Connolly <[email protected]> Committed: Tue Jun 3 11:33:54 2014 +0900 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 6 ++++++ ...ElasticSearchIndexRequestBuilderFactory.java | 9 +++++++-- .../sink/elasticsearch/ElasticSearchSink.java | 6 +++--- .../elasticsearch/SimpleIndexNameBuilder.java | 5 +++-- .../TimeBasedIndexNameBuilder.java | 6 ++++-- .../client/ElasticSearchRestClient.java | 2 -- .../client/ElasticSearchTransportClient.java | 1 - ...ElasticSearchIndexRequestBuilderFactory.java | 21 ++++++++++++++++++++ 8 files changed, 44 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/09472ba1/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 7732c13..040fc8b 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -2008,7 +2008,9 @@ Property Name Default **type** -- The component type name, needs to be ``org.apache.flume.sink.elasticsearch.ElasticSearchSink`` **hostNames** -- Comma separated list of hostname:port, if the port is not present the default port '9300' will be used indexName flume The name of the index which the date will be appended to. Example 'flume' -> 'flume-yyyy-MM-dd' + Arbitrary header substitution is supported, eg. %{header} replaces with value of named event header indexType logs The type to index the document to, defaults to 'log' + Arbitrary header substitution is supported, eg. %{header} replaces with value of named event header clusterName elasticsearch Name of the ElasticSearch cluster to connect to batchSize 100 Number of events to be written per txn. ttl -- TTL in days, when set will cause the expired documents to be deleted automatically, @@ -2021,6 +2023,10 @@ serializer org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEvent serializer.* -- Properties to be passed to the serializer. ================ ======================================================================== ======================================================================================================= +.. note:: Header substitution is a handy to use the value of an event header to dynamically decide the indexName and indexType to use when storing the event. + Caution should be used in using this feature as the event submitter now has control of the indexName and indexType. + Furthermore, if the elasticsearch REST client is used then the event submitter has control of the URL path used. + Example for agent named a1: .. code-block:: properties http://git-wip-us.apache.org/repos/asf/flume/blob/09472ba1/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java index de84b95..9996142 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java @@ -26,6 +26,7 @@ import org.apache.flume.Event; import org.apache.flume.conf.ComponentConfiguration; import org.apache.flume.conf.Configurable; import org.apache.flume.conf.ConfigurableComponent; +import org.apache.flume.formatter.output.BucketPath; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.Client; @@ -75,10 +76,14 @@ public abstract class AbstractElasticSearchIndexRequestBuilderFactory public IndexRequestBuilder createIndexRequest(Client client, String indexPrefix, String indexType, Event event) throws IOException { IndexRequestBuilder request = prepareIndex(client); + String realIndexPrefix = BucketPath.escapeString(indexPrefix, event.getHeaders()); + String realIndexType = BucketPath.escapeString(indexType, event.getHeaders()); + TimestampedEvent timestampedEvent = new TimestampedEvent(event); long timestamp = timestampedEvent.getTimestamp(); - String indexName = getIndexName(indexPrefix, timestamp); - prepareIndexRequest(request, indexName, indexType, timestampedEvent); + + String indexName = getIndexName(realIndexPrefix, timestamp); + prepareIndexRequest(request, indexName, realIndexType, timestampedEvent); return request; } http://git-wip-us.apache.org/repos/asf/flume/blob/09472ba1/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java index 39b6db5..1d9dfce 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java @@ -38,6 +38,7 @@ import org.apache.flume.CounterGroup; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; +import org.apache.flume.formatter.output.BucketPath; import org.apache.flume.conf.Configurable; import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.AbstractSink; @@ -61,8 +62,6 @@ import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEF import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME_BUILDER; import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME_BUILDER_PREFIX; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; - /** * A sink which reads events from a channel and writes them to ElasticSearch * based on the work done by https://github.com/Aconex/elasticflume.git.</p> @@ -186,7 +185,8 @@ public class ElasticSearchSink extends AbstractSink implements Configurable { if (event == null) { break; } - client.addEvent(event, indexNameBuilder, indexType, ttlMs); + String realIndexType = BucketPath.escapeString(indexType, event.getHeaders()); + client.addEvent(event, indexNameBuilder, realIndexType, ttlMs); } if (count <= 0) { http://git-wip-us.apache.org/repos/asf/flume/blob/09472ba1/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java index 19079af..801cac9 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/SimpleIndexNameBuilder.java @@ -19,6 +19,7 @@ package org.apache.flume.sink.elasticsearch; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.conf.ComponentConfiguration; +import org.apache.flume.formatter.output.BucketPath; public class SimpleIndexNameBuilder implements IndexNameBuilder { @@ -26,12 +27,12 @@ public class SimpleIndexNameBuilder implements IndexNameBuilder { @Override public String getIndexName(Event event) { - return indexName; + return BucketPath.escapeString(indexName, event.getHeaders()); } @Override public String getIndexPrefix(Event event) { - return indexName; + return BucketPath.escapeString(indexName, event.getHeaders()); } @Override http://git-wip-us.apache.org/repos/asf/flume/blob/09472ba1/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java index a8603a4..c651732 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/TimeBasedIndexNameBuilder.java @@ -24,6 +24,7 @@ import org.apache.commons.lang.time.FastDateFormat; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.conf.ComponentConfiguration; +import org.apache.flume.formatter.output.BucketPath; import java.util.TimeZone; @@ -60,13 +61,14 @@ public class TimeBasedIndexNameBuilder implements public String getIndexName(Event event) { TimestampedEvent timestampedEvent = new TimestampedEvent(event); long timestamp = timestampedEvent.getTimestamp(); - return new StringBuilder(indexPrefix).append('-') + String realIndexPrefix = BucketPath.escapeString(indexPrefix, event.getHeaders()); + return new StringBuilder(realIndexPrefix).append('-') .append(fastDateFormat.format(timestamp)).toString(); } @Override public String getIndexPrefix(Event event) { - return indexPrefix; + return BucketPath.escapeString(indexPrefix, event.getHeaders()); } @Override http://git-wip-us.apache.org/repos/asf/flume/blob/09472ba1/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java index ff95e30..0d1c37f 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java @@ -116,8 +116,6 @@ public class ElasticSearchRestClient implements ElasticSearchClient { public void execute() throws Exception { int statusCode = 0, triesCount = 0; HttpResponse response = null; - logger.info("Sending bulk request to elasticsearch cluster"); - String entity; synchronized (bulkBuilder) { entity = bulkBuilder.toString(); http://git-wip-us.apache.org/repos/asf/flume/blob/09472ba1/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java index e9ed0b4..d44c8ad 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java @@ -178,7 +178,6 @@ public class ElasticSearchTransportClient implements ElasticSearchClient { @Override public void execute() throws Exception { try { - logger.info("Sending bulk to elasticsearch cluster"); BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet(); if (bulkResponse.hasFailures()) { throw new EventDeliveryException(bulkResponse.buildFailureMessage()); http://git-wip-us.apache.org/repos/asf/flume/blob/09472ba1/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java index 807a9c7..8022111 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java @@ -158,6 +158,27 @@ public class TestElasticSearchIndexRequestBuilderFactory } @Test + public void shouldSetIndexNameTypeFromHeaderWhenPresent() + throws Exception { + String indexPrefix = "%{index-name}"; + String indexType = "%{index-type}"; + String indexValue = "testing-index-name-from-headers"; + String typeValue = "testing-index-type-from-headers"; + + Event event = new SimpleEvent(); + event.getHeaders().put("index-name", indexValue); + event.getHeaders().put("index-type", typeValue); + + IndexRequestBuilder indexRequestBuilder = factory.createIndexRequest( + null, indexPrefix, indexType, event); + + assertEquals(indexValue + '-' + + ElasticSearchIndexRequestBuilderFactory.df.format(FIXED_TIME_MILLIS), + indexRequestBuilder.request().index()); + assertEquals(typeValue, indexRequestBuilder.request().type()); + } + + @Test public void shouldConfigureEventSerializer() throws Exception { assertFalse(serializer.configuredWithContext); factory.configure(new Context());
