Updated Branches: refs/heads/flume-1.4 6ba625c8e -> 2b66fb8d3
FLUME-2015. ElasticSearchSink: need access to IndexRequestBuilder instance during flume event processing (Tim Bacon and Edward Sargisson via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2b66fb8d Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2b66fb8d Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2b66fb8d Branch: refs/heads/flume-1.4 Commit: 2b66fb8d33d86bd7dabbb1b4f911c38c892dbdaa Parents: 6ba625c Author: Mike Percy <[email protected]> Authored: Fri May 10 13:17:32 2013 -0700 Committer: Mike Percy <[email protected]> Committed: Fri May 10 13:18:56 2013 -0700 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 56 +++-- ...actElasticSearchIndexRequestBuilderFactory.java | 158 ++++++++++++ .../ElasticSearchEventSerializer.java | 8 +- .../ElasticSearchIndexRequestBuilderFactory.java | 58 +++++ .../sink/elasticsearch/ElasticSearchSink.java | 36 ++-- .../EventSerializerIndexRequestBuilderFactory.java | 69 +++++ .../AbstractElasticSearchSinkTest.java | 19 ++- ...estElasticSearchIndexRequestBuilderFactory.java | 194 +++++++++++++++ .../sink/elasticsearch/TestElasticSearchSink.java | 107 ++++++++- 9 files changed, 655 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/2b66fb8d/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index d09a3f7..d129abf 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1765,31 +1765,47 @@ Example for agent named a1: a1.sinks.k1.channel = c1 ElasticSearchSink -''''''''''''''''' +~~~~~~~~~~~~~~~~~ + +This sink writes data to an elasticsearch cluster. By default, events will be written so that the `Kibana <http://kibana.org>`_ graphical interface +can display them - just as if `logstash <https://logstash.net>`_ wrote them. + +The elasticsearch and lucene-core jars required for your environment must be placed in the lib directory of the Apache Flume installation. +Elasticsearch requires that the major version of the client JAR match that of the server and that both are running the same minor version +of the JVM. SerializationExceptions will appear if this is incorrect. To +select the required version first determine the version of elasticsearch and the JVM version the target cluster is running. Then select an elasticsearch client +library which matches the major version. A 0.19.x client can talk to a 0.19.x cluster; 0.20.x can talk to 0.20.x and 0.90.x can talk to 0.90.x. Once the +elasticsearch version has been determined then read the pom.xml file to determine the correct lucene-core JAR version to use. The Flume agent +which is running the ElasticSearchSink should also match the JVM the target cluster is running down to the minor version. + +Events will be written to a new index every day. The name will be <indexName>-yyyy-MM-dd where <indexName> is the indexName parameter. The sink +will start writing to a new index at midnight UTC. + +Events are serialized for elasticsearch by the ElasticSearchLogStashEventSerializer by default. This behaviour can be +overridden with the serializer parameter. This parameter accepts implementations of org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer +or org.apache.flume.sink.elasticsearch.ElasticSearchIndexRequestBuilderFactory. Implementing ElasticSearchEventSerializer is deprecated in favour of +the more powerful ElasticSearchIndexRequestBuilderFactory. -This sink writes data to ElasticSearch. A class implementing -ElasticSearchEventSerializer which is specified by the configuration is used to convert the events into -XContentBuilder which detail the fields and mappings which will be indexed. These are then then written -to ElasticSearch. The sink will generate an index per day allowing easier management instead of dealing with -a single large index The type is the FQCN: org.apache.flume.sink.elasticsearch.ElasticSearchSink + Required properties are in **bold**. -================ ================================================================== ======================================================================================================= -Property Name Default Description -================ ================================================================== ======================================================================================================= +================ ======================================================================== ======================================================================================================= +Property Name Default Description +================ ======================================================================== ======================================================================================================= **channel** -- -**type** -- The component type name, needs to be ``elasticsearch`` -**hostNames** -- Comma separated list of hostname:port, if the port is not present the default port '9300' will be used -indexName flume The name of the index which the date will be appended to. Example 'flume' -> 'flume-yyyy-MM-dd' -indexType logs The type to index the document to, defaults to 'log' -clusterName elasticsearch Name of the ElasticSearch cluster to connect to -batchSize 100 Number of events to be written per txn. -ttl -- TTL in days, when set will cause the expired documents to be deleted automatically, - if not set documents will never be automatically deleted -serializer org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer -serializer.* -- Properties to be passed to the serializer. -================ ================================================================== ======================================================================================================= +**type** -- The component type name, needs to be ``org.apache.flume.sink.elasticsearch.ElasticSearchSink`` +**hostNames** -- Comma separated list of hostname:port, if the port is not present the default port '9300' will be used +indexName flume The name of the index which the date will be appended to. Example 'flume' -> 'flume-yyyy-MM-dd' +indexType logs The type to index the document to, defaults to 'log' +clusterName elasticsearch Name of the ElasticSearch cluster to connect to +batchSize 100 Number of events to be written per txn. +ttl -- TTL in days, when set will cause the expired documents to be deleted automatically, + if not set documents will never be automatically deleted +serializer org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer The ElasticSearchIndexRequestBuilderFactory or ElasticSearchEventSerializer to use. Implementations of + either class are accepted but ElasticSearchIndexRequestBuilderFactory is preferred. +serializer.* -- Properties to be passed to the serializer. +================ ======================================================================== ======================================================================================================= Example for agent named a1: http://git-wip-us.apache.org/repos/asf/flume/blob/2b66fb8d/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java new file mode 100644 index 0000000..6effe34 --- /dev/null +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang.time.FastDateFormat; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.conf.ComponentConfiguration; +import org.apache.flume.conf.Configurable; +import org.apache.flume.conf.ConfigurableComponent; +import org.apache.flume.event.SimpleEvent; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.Client; +import org.joda.time.DateTimeUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; + +/** + * Abstract base class for custom implementations of + * {@link ElasticSearchIndexRequestBuilderFactory}. + */ +public abstract class AbstractElasticSearchIndexRequestBuilderFactory + implements ElasticSearchIndexRequestBuilderFactory { + + /** + * {@link FastDateFormat} to use for index names + * in {@link #getIndexName(String, long)} + */ + protected final FastDateFormat fastDateFormat; + + /** + * Constructor for subclasses + * @param fastDateFormat {@link FastDateFormat} to use for index names + */ + protected AbstractElasticSearchIndexRequestBuilderFactory( + FastDateFormat fastDateFormat) { + this.fastDateFormat = fastDateFormat; + } + + /** + * @see Configurable + */ + @Override + public abstract void configure(Context arg0); + + /** + * @see ConfigurableComponent + */ + @Override + public abstract void configure(ComponentConfiguration arg0); + + /** + * Creates and prepares an {@link IndexRequestBuilder} from the supplied + * {@link Client} via delegation to the subclass-hook template methods + * {@link #getIndexName(String, long)} and + * {@link #prepareIndexRequest(IndexRequestBuilder, String, String, Event)} + */ + @Override + public IndexRequestBuilder createIndexRequest(Client client, + String indexPrefix, String indexType, Event event) throws IOException { + IndexRequestBuilder request = prepareIndex(client); + TimestampedEvent timestampedEvent = new TimestampedEvent(event); + long timestamp = timestampedEvent.getTimestamp(); + String indexName = getIndexName(indexPrefix, timestamp); + prepareIndexRequest(request, indexName, indexType, timestampedEvent); + return request; + } + + @VisibleForTesting + IndexRequestBuilder prepareIndex(Client client) { + return client.prepareIndex(); + } + + /** + * Gets the name of the index to use for an index request + * @return index name of the form 'indexPrefix-formattedTimestamp' + * @param indexPrefix + * Prefix of index name to use -- as configured on the sink + * @param timestamp + * timestamp (millis) to format / use + */ + protected String getIndexName(String indexPrefix, long timestamp) { + return new StringBuilder(indexPrefix).append('-') + .append(fastDateFormat.format(timestamp)).toString(); + } + + /** + * Prepares an ElasticSearch {@link IndexRequestBuilder} instance + * @param indexRequest + * The (empty) ElasticSearch {@link IndexRequestBuilder} to prepare + * @param indexName + * Index name to use -- as per {@link #getIndexName(String, long)} + * @param indexType + * Index type to use -- as configured on the sink + * @param event + * Flume event to serialize and add to index request + * @throws IOException + * If an error occurs e.g. during serialization + */ + protected abstract void prepareIndexRequest( + IndexRequestBuilder indexRequest, String indexName, + String indexType, Event event) throws IOException; + +} + +/** + * {@link Event} implementation that has a timestamp. + * The timestamp is taken from (in order of precedence):<ol> + * <li>The "timestamp" header of the base event, if present</li> + * <li>The "@timestamp" header of the base event, if present</li> + * <li>The current time in millis, otherwise</li> + * </ol> + */ +final class TimestampedEvent extends SimpleEvent { + + private final long timestamp; + + TimestampedEvent(Event base) { + setBody(base.getBody()); + Map<String, String> headers = Maps.newHashMap(base.getHeaders()); + String timestampString = headers.get("timestamp"); + if (StringUtils.isBlank(timestampString)) { + timestampString = headers.get("@timestamp"); + } + if (StringUtils.isBlank(timestampString)) { + this.timestamp = DateTimeUtils.currentTimeMillis(); + headers.put("timestamp", String.valueOf(timestamp )); + } else { + this.timestamp = Long.valueOf(timestampString); + } + setHeaders(headers); + } + + long getTimestamp() { + return timestamp; + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/2b66fb8d/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java index dc6a093..c89d627 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchEventSerializer.java @@ -24,7 +24,7 @@ import java.nio.charset.Charset; import org.apache.flume.Event; import org.apache.flume.conf.Configurable; import org.apache.flume.conf.ConfigurableComponent; -import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.io.BytesStream; /** * Interface for an event serializer which serializes the headers and body of an @@ -37,12 +37,12 @@ public interface ElasticSearchEventSerializer extends Configurable, public static final Charset charset = Charset.defaultCharset(); /** - * Return an {@link XContentBuilder} made up of the serialized flume event + * Return an {@link BytesStream} made up of the serialized flume event * @param event * The flume event to serialize - * @return A {@link XContentBuilder} used to write to ElasticSearch + * @return A {@link BytesStream} used to write to ElasticSearch * @throws IOException * If an error occurs during serialization */ - abstract XContentBuilder getContentBuilder(Event event) throws IOException; + abstract BytesStream getContentBuilder(Event event) throws IOException; } http://git-wip-us.apache.org/repos/asf/flume/blob/2b66fb8d/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java new file mode 100644 index 0000000..8e77a1e --- /dev/null +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import java.io.IOException; +import java.util.TimeZone; + +import org.apache.commons.lang.time.FastDateFormat; +import org.apache.flume.Event; +import org.apache.flume.conf.Configurable; +import org.apache.flume.conf.ConfigurableComponent; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.Client; + +/** + * Interface for creating ElasticSearch {@link IndexRequestBuilder} + * instances from serialized flume events. This is configurable, so any config + * params required should be taken through this. + */ +public interface ElasticSearchIndexRequestBuilderFactory extends Configurable, + ConfigurableComponent { + + static final FastDateFormat df = FastDateFormat.getInstance("yyyy-MM-dd", + TimeZone.getTimeZone("Etc/UTC")); + + /** + * @return prepared ElasticSearch {@link IndexRequestBuilder} instance + * @param client + * ElasticSearch {@link Client} to prepare index from + * @param indexPrefix + * Prefix of index name to use -- as configured on the sink + * @param indexType + * Index type to use -- as configured on the sink + * @param event + * Flume event to serialize and add to index request + * @throws IOException + * If an error occurs e.g. during serialization + */ + IndexRequestBuilder createIndexRequest(Client client, + String indexPrefix, String indexType, Event event) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/flume/blob/2b66fb8d/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java index 1b3db14..3286412 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java @@ -33,11 +33,9 @@ import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SER import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL; import java.util.Arrays; -import java.util.Date; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang.time.FastDateFormat; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.CounterGroup; @@ -55,7 +53,6 @@ import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeBuilder; import org.slf4j.Logger; @@ -90,8 +87,6 @@ public class ElasticSearchSink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory .getLogger(ElasticSearchSink.class); - static final FastDateFormat df = FastDateFormat.getInstance("yyyy-MM-dd"); - // Used for testing private boolean isLocal = false; private final CounterGroup counterGroup = new CounterGroup(); @@ -108,7 +103,7 @@ public class ElasticSearchSink extends AbstractSink implements Configurable { private Node node; private Client client; - private ElasticSearchEventSerializer serializer; + private ElasticSearchIndexRequestBuilderFactory indexRequestFactory; private SinkCounter sinkCounter; /** @@ -145,7 +140,7 @@ public class ElasticSearchSink extends AbstractSink implements Configurable { @VisibleForTesting String getIndexName() { - return indexName + "-" + df.format(new Date()); + return indexName; } @VisibleForTesting @@ -166,7 +161,6 @@ public class ElasticSearchSink extends AbstractSink implements Configurable { Transaction txn = channel.getTransaction(); try { txn.begin(); - String indexName = getIndexName(); BulkRequestBuilder bulkRequest = client.prepareBulk(); for (int i = 0; i < batchSize; i++) { Event event = channel.take(); @@ -175,15 +169,15 @@ public class ElasticSearchSink extends AbstractSink implements Configurable { break; } - XContentBuilder builder = serializer.getContentBuilder(event); - IndexRequestBuilder request = client.prepareIndex(indexName, indexType) - .setSource(builder); + IndexRequestBuilder indexRequest = + indexRequestFactory.createIndexRequest( + client, indexName, indexType, event); if (ttlMs > 0) { - request.setTTL(ttlMs); + indexRequest.setTTL(ttlMs); } - bulkRequest.add(request); + bulkRequest.add(indexRequest); } int size = bulkRequest.numberOfActions(); @@ -291,10 +285,20 @@ public class ElasticSearchSink extends AbstractSink implements Configurable { try { @SuppressWarnings("unchecked") - Class<? extends ElasticSearchEventSerializer> clazz = (Class<? extends ElasticSearchEventSerializer>) Class + Class<? extends Configurable> clazz = (Class<? extends Configurable>) Class .forName(serializerClazz); - serializer = clazz.newInstance(); - serializer.configure(serializerContext); + Configurable serializer = clazz.newInstance(); + if (serializer instanceof ElasticSearchIndexRequestBuilderFactory) { + indexRequestFactory = (ElasticSearchIndexRequestBuilderFactory) serializer; + } else if (serializer instanceof ElasticSearchEventSerializer){ + indexRequestFactory = new EventSerializerIndexRequestBuilderFactory( + (ElasticSearchEventSerializer) serializer); + } else { + throw new IllegalArgumentException( + serializerClazz + " is neither an ElasticSearchEventSerializer" + + " nor an ElasticSearchIndexRequestBuilderFactory."); + } + indexRequestFactory.configure(serializerContext); } catch (Exception e) { logger.error("Could not instantiate event serializer.", e); Throwables.propagate(e); http://git-wip-us.apache.org/repos/asf/flume/blob/2b66fb8d/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java new file mode 100644 index 0000000..c71b2e5 --- /dev/null +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import java.io.IOException; + +import org.apache.commons.lang.time.FastDateFormat; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.conf.ComponentConfiguration; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.common.io.BytesStream; + +/** + * Default implementation of {@link ElasticSearchIndexRequestBuilderFactory}. + * It serializes flume events using the + * {@link ElasticSearchEventSerializer} instance configured on the sink. + */ +public class EventSerializerIndexRequestBuilderFactory + extends AbstractElasticSearchIndexRequestBuilderFactory { + + protected final ElasticSearchEventSerializer serializer; + + public EventSerializerIndexRequestBuilderFactory( + ElasticSearchEventSerializer serializer) { + this(serializer, ElasticSearchIndexRequestBuilderFactory.df); + } + + protected EventSerializerIndexRequestBuilderFactory( + ElasticSearchEventSerializer serializer, FastDateFormat fdf) { + super(fdf); + this.serializer = serializer; + } + + @Override + public void configure(Context context) { + serializer.configure(context); + } + + @Override + public void configure(ComponentConfiguration config) { + serializer.configure(config); + } + + @Override + protected void prepareIndexRequest(IndexRequestBuilder indexRequest, + String indexName, String indexType, Event event) throws IOException { + BytesStream contentBuilder = serializer.getContentBuilder(event); + indexRequest.setIndex(indexName) + .setType(indexType) + .setSource(contentBuilder.bytes()); + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/2b66fb8d/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java index 2edacdc..ecbdd99 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java @@ -27,7 +27,6 @@ import static org.junit.Assert.assertEquals; import java.util.Arrays; import java.util.Comparator; -import java.util.Date; import java.util.Map; import org.apache.flume.Channel; @@ -48,12 +47,16 @@ import org.elasticsearch.node.NodeBuilder; import org.elasticsearch.node.internal.InternalNode; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; +import org.joda.time.DateTimeUtils; +import org.junit.After; +import org.junit.Before; public abstract class AbstractElasticSearchSinkTest { static final String DEFAULT_INDEX_NAME = "flume"; static final String DEFAULT_INDEX_TYPE = "log"; static final String DEFAULT_CLUSTER_NAME = "elasticsearch"; + static final long FIXED_TIME_MILLIS = 123456789L; Node node; Client client; @@ -68,8 +71,8 @@ public abstract class AbstractElasticSearchSinkTest { parameters.put(BATCH_SIZE, "1"); parameters.put(TTL, "5"); - timestampedIndexName = DEFAULT_INDEX_NAME + "-" - + ElasticSearchSink.df.format(new Date()); + timestampedIndexName = DEFAULT_INDEX_NAME + '-' + + ElasticSearchIndexRequestBuilderFactory.df.format(FIXED_TIME_MILLIS); } void createNodes() throws Exception { @@ -94,6 +97,16 @@ public abstract class AbstractElasticSearchSinkTest { node.close(); } + @Before + public void setFixedJodaTime() { + DateTimeUtils.setCurrentMillisFixed(FIXED_TIME_MILLIS); + } + + @After + public void resetJodaTime() { + DateTimeUtils.setCurrentMillisSystem(); + } + Channel bindAndStartChannel(ElasticSearchSink fixture) { // Configure the channel Channel channel = new MemoryChannel(); http://git-wip-us.apache.org/repos/asf/flume/blob/2b66fb8d/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java new file mode 100644 index 0000000..1e4e119 --- /dev/null +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume.sink.elasticsearch; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Map; + +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.conf.ComponentConfiguration; +import org.apache.flume.conf.sink.SinkConfiguration; +import org.apache.flume.event.SimpleEvent; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.io.BytesStream; +import org.elasticsearch.common.io.FastByteArrayOutputStream; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Maps; + +public class TestElasticSearchIndexRequestBuilderFactory + extends AbstractElasticSearchSinkTest { + + private static final Client FAKE_CLIENT = null; + + private EventSerializerIndexRequestBuilderFactory factory; + + private FakeEventSerializer serializer; + + @Before + public void setupFactory() throws Exception { + serializer = new FakeEventSerializer(); + factory = new EventSerializerIndexRequestBuilderFactory(serializer) { + @Override + IndexRequestBuilder prepareIndex(Client client) { + return new IndexRequestBuilder(FAKE_CLIENT); + } + }; + } + + @Test + public void shouldUseUtcAsBasisForDateFormat() { + assertEquals("Coordinated Universal Time", + factory.fastDateFormat.getTimeZone().getDisplayName()); + } + + @Test + public void indexNameShouldBePrefixDashFormattedTimestamp() { + long millis = 987654321L; + assertEquals("prefix-"+factory.fastDateFormat.format(millis), + factory.getIndexName("prefix", millis)); + } + + @Test + public void shouldEnsureTimestampHeaderPresentInTimestampedEvent() { + SimpleEvent base = new SimpleEvent(); + + TimestampedEvent timestampedEvent = new TimestampedEvent(base); + assertEquals(FIXED_TIME_MILLIS, timestampedEvent.getTimestamp()); + assertEquals(String.valueOf(FIXED_TIME_MILLIS), + timestampedEvent.getHeaders().get("timestamp")); + } + + @Test + public void shouldUseExistingTimestampHeaderInTimestampedEvent() { + SimpleEvent base = new SimpleEvent(); + Map<String, String> headersWithTimestamp = Maps.newHashMap(); + headersWithTimestamp.put("timestamp", "-321"); + base.setHeaders(headersWithTimestamp ); + + TimestampedEvent timestampedEvent = new TimestampedEvent(base); + assertEquals(-321L, timestampedEvent.getTimestamp()); + assertEquals("-321", timestampedEvent.getHeaders().get("timestamp")); + } + + @Test + public void shouldUseExistingAtTimestampHeaderInTimestampedEvent() { + SimpleEvent base = new SimpleEvent(); + Map<String, String> headersWithTimestamp = Maps.newHashMap(); + headersWithTimestamp.put("@timestamp", "-999"); + base.setHeaders(headersWithTimestamp ); + + TimestampedEvent timestampedEvent = new TimestampedEvent(base); + assertEquals(-999L, timestampedEvent.getTimestamp()); + assertEquals("-999", timestampedEvent.getHeaders().get("@timestamp")); + assertNull(timestampedEvent.getHeaders().get("timestamp")); + } + + @Test + public void shouldPreserveBodyAndNonTimestampHeadersInTimestampedEvent() { + SimpleEvent base = new SimpleEvent(); + base.setBody(new byte[] {1,2,3,4}); + Map<String, String> headersWithTimestamp = Maps.newHashMap(); + headersWithTimestamp.put("foo", "bar"); + base.setHeaders(headersWithTimestamp ); + + TimestampedEvent timestampedEvent = new TimestampedEvent(base); + assertEquals("bar", timestampedEvent.getHeaders().get("foo")); + assertArrayEquals(base.getBody(), timestampedEvent.getBody()); + } + + @Test + public void shouldSetIndexNameTypeAndSerializedEventIntoIndexRequest() + throws Exception { + + String indexPrefix = "qwerty"; + String indexType = "uiop"; + Event event = new SimpleEvent(); + + IndexRequestBuilder indexRequestBuilder = factory.createIndexRequest( + FAKE_CLIENT, indexPrefix, indexType, event); + + assertEquals(indexPrefix + '-' + + ElasticSearchIndexRequestBuilderFactory.df.format(FIXED_TIME_MILLIS), + indexRequestBuilder.request().index()); + assertEquals(indexType, indexRequestBuilder.request().type()); + assertArrayEquals(FakeEventSerializer.FAKE_BYTES, + indexRequestBuilder.request().source().array()); + } + + @Test + public void shouldSetIndexNameFromTimestampHeaderWhenPresent() + throws Exception { + String indexPrefix = "qwerty"; + String indexType = "uiop"; + Event event = new SimpleEvent(); + event.getHeaders().put("timestamp", "1213141516"); + + IndexRequestBuilder indexRequestBuilder = factory.createIndexRequest( + null, indexPrefix, indexType, event); + + assertEquals(indexPrefix + '-' + + ElasticSearchIndexRequestBuilderFactory.df.format(1213141516L), + indexRequestBuilder.request().index()); + } + + @Test + public void shouldConfigureEventSerializer() throws Exception { + assertFalse(serializer.configuredWithContext); + factory.configure(new Context()); + assertTrue(serializer.configuredWithContext); + + assertFalse(serializer.configuredWithComponentConfiguration); + factory.configure(new SinkConfiguration("name")); + assertTrue(serializer.configuredWithComponentConfiguration); + } + +} + +class FakeEventSerializer implements ElasticSearchEventSerializer { + + static final byte[] FAKE_BYTES = new byte[] {9,8,7,6}; + boolean configuredWithContext, configuredWithComponentConfiguration; + + @Override + public BytesStream getContentBuilder(Event event) throws IOException { + FastByteArrayOutputStream fbaos = new FastByteArrayOutputStream(4); + fbaos.write(FAKE_BYTES); + return fbaos; + } + + @Override + public void configure(Context arg0) { + configuredWithContext = true; + } + + @Override + public void configure(ComponentConfiguration arg0) { + configuredWithComponentConfiguration = true; + } +} http://git-wip-us.apache.org/repos/asf/flume/blob/2b66fb8d/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java index 94b95b1..ad40a3c 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java @@ -24,20 +24,27 @@ import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEF import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES; import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME; import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER; import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; -import java.util.Date; +import java.io.IOException; +import java.util.TimeZone; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.time.FastDateFormat; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.Sink.Status; import org.apache.flume.Transaction; +import org.apache.flume.conf.ComponentConfiguration; +import org.apache.flume.conf.Configurable; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; +import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.Requests; import org.elasticsearch.common.UUID; import org.elasticsearch.common.transport.InetSocketTransportAddress; @@ -112,6 +119,7 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest { assertMatchAllQuery(numberOfEvents, events); assertBodyQuery(5, events); } + @Test public void shouldIndexFiveEventsOverThreeBatches() throws Exception { parameters.put(BATCH_SIZE, "2"); @@ -163,9 +171,7 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest { "10.5.5.27", DEFAULT_PORT) }; assertEquals("testing-cluster-name", fixture.getClusterName()); - assertEquals( - "testing-index-name-" + ElasticSearchSink.df.format(new Date()), - fixture.getIndexName()); + assertEquals("testing-index-name", fixture.getIndexName()); assertEquals("testing-index-type", fixture.getIndexType()); assertEquals(TimeUnit.DAYS.toMillis(10), fixture.getTTLMs()); assertArrayEquals(expected, fixture.getServerAddresses()); @@ -184,9 +190,7 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest { InetSocketTransportAddress[] expected = { new InetSocketTransportAddress( "10.5.5.27", DEFAULT_PORT) }; - assertEquals( - DEFAULT_INDEX_NAME + "-" + ElasticSearchSink.df.format(new Date()), - fixture.getIndexName()); + assertEquals(DEFAULT_INDEX_NAME, fixture.getIndexName()); assertEquals(DEFAULT_INDEX_TYPE, fixture.getIndexType()); assertEquals(DEFAULT_CLUSTER_NAME, fixture.getClusterName()); assertArrayEquals(expected, fixture.getServerAddresses()); @@ -221,4 +225,93 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest { assertArrayEquals(expected, fixture.getServerAddresses()); } + + @Test + public void shouldAllowCustomElasticSearchIndexRequestBuilderFactory() + throws Exception { + + parameters.put(SERIALIZER, + CustomElasticSearchIndexRequestBuilderFactory.class.getName()); + + Configurables.configure(fixture, new Context(parameters)); + + Channel channel = bindAndStartChannel(fixture); + Transaction tx = channel.getTransaction(); + tx.begin(); + String body = "{ foo: \"bar\" }"; + Event event = EventBuilder.withBody(body.getBytes()); + channel.put(event); + tx.commit(); + tx.close(); + + fixture.process(); + fixture.stop(); + + assertEquals(fixture.getIndexName()+"-05_17_36_789", + CustomElasticSearchIndexRequestBuilderFactory.actualIndexName); + assertEquals(fixture.getIndexType(), + CustomElasticSearchIndexRequestBuilderFactory.actualIndexType); + assertArrayEquals(event.getBody(), + CustomElasticSearchIndexRequestBuilderFactory.actualEventBody); + assertTrue(CustomElasticSearchIndexRequestBuilderFactory.hasContext); + } + + public static final class CustomElasticSearchIndexRequestBuilderFactory + extends AbstractElasticSearchIndexRequestBuilderFactory { + + static String actualIndexName, actualIndexType; + static byte[] actualEventBody; + static boolean hasContext; + + public CustomElasticSearchIndexRequestBuilderFactory() { + super(FastDateFormat.getInstance("HH_mm_ss_SSS", + TimeZone.getTimeZone("EST5EDT"))); + } + + @Override + protected void prepareIndexRequest(IndexRequestBuilder indexRequest, + String indexName, String indexType, Event event) throws IOException { + actualIndexName = indexName; + actualIndexType = indexType; + actualEventBody = event.getBody(); + indexRequest.setIndex(indexName).setType(indexType) + .setSource(event.getBody()); + } + + @Override + public void configure(Context arg0) { + hasContext = true; + } + + @Override + public void configure(ComponentConfiguration arg0) { + //no-op + } + } + + @Test + public void shouldFailToConfigureWithInvalidSerializerClass() + throws Exception { + + parameters.put(SERIALIZER, "java.lang.String"); + try { + Configurables.configure(fixture, new Context(parameters)); + } catch (ClassCastException e) { + // expected + } + + parameters.put(SERIALIZER, FakeConfigurable.class.getName()); + try { + Configurables.configure(fixture, new Context(parameters)); + } catch (IllegalArgumentException e) { + // expected + } + } + + public static class FakeConfigurable implements Configurable { + @Override + public void configure(Context arg0) { + // no-op + } + } }
