This is an automated email from the ASF dual-hosted git repository. nickallen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push: new ec3b98f METRON-1849 Elasticsearch Index Write Functionality Should be Shared (nickwallen) closes apache/metron#1254 ec3b98f is described below commit ec3b98f762ce3726ed9a33abdb446957d1865dca Author: nickwallen <n...@nickallen.org> AuthorDate: Tue Dec 11 12:59:08 2018 -0500 METRON-1849 Elasticsearch Index Write Functionality Should be Shared (nickwallen) closes apache/metron#1254 --- .../elasticsearch/bulk/BulkDocumentWriter.java | 45 +++ .../bulk/BulkDocumentWriterResults.java | 68 ++++ .../bulk/ElasticsearchBulkDocumentWriter.java | 166 ++++++++++ .../metron/elasticsearch/bulk/WriteFailure.java | 48 +++ .../metron/elasticsearch/bulk/WriteSuccess.java | 36 +++ .../metron/elasticsearch/dao/ElasticsearchDao.java | 2 +- .../elasticsearch/dao/ElasticsearchUpdateDao.java | 144 ++++----- .../elasticsearch/writer/ElasticsearchWriter.java | 157 +++++---- .../elasticsearch/writer/TupleBasedDocument.java | 44 +++ .../bulk/ElasticsearchBulkDocumentWriterTest.java | 178 ++++++++++ .../components/ElasticSearchComponent.java | 46 +-- .../writer/ElasticsearchWriterTest.java | 360 ++++++++++++++------- .../src/test/resources/log4j.properties | 0 13 files changed, 994 insertions(+), 300 deletions(-) diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriter.java new file mode 100644 index 0000000..34f543e --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriter.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.elasticsearch.bulk; + +import org.apache.metron.indexing.dao.update.Document; + +/** + * Writes documents to an index in bulk. + * + * @param <D> The type of document to write. + */ +public interface BulkDocumentWriter<D extends Document> { + + /** + * Add a document to the batch. + * @param document The document to write. + * @param index The name of the index to write to. + */ + void addDocument(D document, String index); + + /** + * @return The number of documents waiting to be written. + */ + int size(); + + /** + * Write all documents in the batch. + */ + BulkDocumentWriterResults<D> write(); +} diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriterResults.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriterResults.java new file mode 100644 index 0000000..90e5ce3 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriterResults.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.elasticsearch.bulk; + +import org.apache.metron.indexing.dao.update.Document; + +import java.util.ArrayList; +import java.util.List; + +/** + * The result of writing documents in bulk using a {@link BulkDocumentWriter}. + * @param <D> The type of documents to write. + */ +public class BulkDocumentWriterResults<D extends Document> { + + private List<WriteSuccess<D>> successes; + private List<WriteFailure<D>> failures; + + public BulkDocumentWriterResults() { + this.successes = new ArrayList<>(); + this.failures = new ArrayList<>(); + } + + public void add(WriteSuccess<D> success) { + this.successes.add(success); + } + + public void addSuccess(D success) { + add(new WriteSuccess<D>(success)); + } + + public void addSuccesses(List<D> successes) { + for(D success: successes) { + addSuccess(success); + } + } + + public List<WriteSuccess<D>> getSuccesses() { + return successes; + } + + public void add(WriteFailure<D> failure) { + this.failures.add(failure); + } + + public void addFailure(D document, Throwable cause, String message) { + add(new WriteFailure(document, cause, message)); + } + + public List<WriteFailure<D>> getFailures() { + return failures; + } +} diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java new file mode 100644 index 0000000..9e6e568 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.elasticsearch.bulk; + +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.metron.elasticsearch.client.ElasticsearchClient; +import org.apache.metron.indexing.dao.update.Document; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Writes documents to an Elasticsearch index in bulk. + * + * @param <D> The type of document to write. + */ +public class ElasticsearchBulkDocumentWriter<D extends Document> implements BulkDocumentWriter<D> { + + /** + * A {@link Document} along with the index it will be written to. + */ + private class Indexable { + D document; + String index; + + public Indexable(D document, String index) { + this.document = document; + this.index = index; + } + } + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private ElasticsearchClient client; + private List<Indexable> documents; + private WriteRequest.RefreshPolicy refreshPolicy; + + public ElasticsearchBulkDocumentWriter(ElasticsearchClient client) { + this.client = client; + this.documents = new ArrayList<>(); + this.refreshPolicy = WriteRequest.RefreshPolicy.NONE; + } + + @Override + public void addDocument(D document, String indexName) { + documents.add(new Indexable(document, indexName)); + LOG.debug("Adding document to batch; document={}, index={}", document, indexName); + } + + @Override + public BulkDocumentWriterResults<D> write() { + BulkDocumentWriterResults<D> results = new BulkDocumentWriterResults<>(); + try { + // create an index request for each document + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.setRefreshPolicy(refreshPolicy); + for(Indexable doc: documents) { + DocWriteRequest request = createRequest(doc.document, doc.index); + bulkRequest.add(request); + } + + // submit the request and handle the response + BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequest); + handleBulkResponse(bulkResponse, documents, results); + + } catch(IOException e) { + // assume all documents have failed + for(Indexable indexable: documents) { + D failed = indexable.document; + results.addFailure(failed, e, ExceptionUtils.getRootCauseMessage(e)); + } + LOG.error("Failed to submit bulk request; all documents failed", e); + + } finally { + // flush all documents no matter which ones succeeded or failed + documents.clear(); + } + + LOG.debug("Wrote document(s) to Elasticsearch; batchSize={}, success={}, failed={}", + documents.size(), results.getSuccesses().size(), results.getFailures().size()); + return results; + } + + @Override + public int size() { + return documents.size(); + } + + public ElasticsearchBulkDocumentWriter<D> withRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) { + this.refreshPolicy = refreshPolicy; + return this; + } + + private IndexRequest createRequest(D document, String index) { + if(document.getTimestamp() == null) { + throw new IllegalArgumentException("Document must contain the timestamp"); + } + return new IndexRequest() + .source(document.getDocument()) + .type(document.getSensorType() + "_doc") + .id(document.getGuid()) + .index(index) + .timestamp(document.getTimestamp().toString()); + } + + /** + * Handles the {@link BulkResponse} received from Elasticsearch. + * @param bulkResponse The response received from Elasticsearch. + * @param documents The documents included in the bulk request. + * @param results The writer results. + */ + private void handleBulkResponse(BulkResponse bulkResponse, List<Indexable> documents, BulkDocumentWriterResults<D> results) { + if (bulkResponse.hasFailures()) { + + // interrogate the response to distinguish between those that succeeded and those that failed + for(BulkItemResponse response: bulkResponse) { + if(response.isFailed()) { + // request failed + D failed = getDocument(response.getItemId()); + Exception cause = response.getFailure().getCause(); + String message = response.getFailureMessage(); + results.addFailure(failed, cause, message); + + } else { + // request succeeded + D success = getDocument(response.getItemId()); + results.addSuccess(success); + } + } + } else { + // all requests succeeded + for(Indexable success: documents) { + results.addSuccess(success.document); + } + } + } + + private D getDocument(int index) { + return documents.get(index).document; + } +} diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteFailure.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteFailure.java new file mode 100644 index 0000000..ac571c7 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteFailure.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.elasticsearch.bulk; + +import org.apache.metron.indexing.dao.update.Document; + +/** + * Indicates that a document failed to be written by a {@link BulkDocumentWriter}. + * @param <D> The type of document that failed to write. + */ +public class WriteFailure <D extends Document> { + private D document; + private Throwable cause; + private String message; + + public WriteFailure(D document, Throwable cause, String message) { + this.document = document; + this.cause = cause; + this.message = message; + } + + public D getDocument() { + return document; + } + + public Throwable getCause() { + return cause; + } + + public String getMessage() { + return message; + } +} diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteSuccess.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteSuccess.java new file mode 100644 index 0000000..a86325d --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteSuccess.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.elasticsearch.bulk; + +import org.apache.metron.indexing.dao.update.Document; + +/** + * Indicates that a document was successfully written by a {@link BulkDocumentWriter}. + * @param <D> The type of document written. + */ +public class WriteSuccess <D extends Document> { + private D document; + + public WriteSuccess(D document) { + this.document = document; + } + + public D getDocument() { + return document; + } +} diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java index 675d22f..7226c30 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java @@ -191,7 +191,7 @@ public class ElasticsearchDao implements IndexDao { } protected Optional<String> getIndexName(String guid, String sensorType) throws IOException { - return updateDao.getIndexName(guid, sensorType); + return updateDao.findIndexNameByGUID(guid, sensorType); } protected SearchResponse search(SearchRequest request, QueryBuilder queryBuilder) diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java index ba852aa..fa02f8d 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java @@ -17,18 +17,9 @@ */ package org.apache.metron.elasticsearch.dao; -import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD; - -import java.io.IOException; -import java.lang.invoke.MethodHandles; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; - +import org.apache.metron.elasticsearch.bulk.ElasticsearchBulkDocumentWriter; +import org.apache.metron.elasticsearch.bulk.WriteFailure; +import org.apache.metron.elasticsearch.bulk.BulkDocumentWriterResults; import org.apache.metron.elasticsearch.client.ElasticsearchClient; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; @@ -36,84 +27,80 @@ import org.apache.metron.indexing.dao.search.AlertComment; import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest; import org.apache.metron.indexing.dao.update.Document; import org.apache.metron.indexing.dao.update.UpdateDao; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD; + +import static java.lang.String.format; + public class ElasticsearchUpdateDao implements UpdateDao { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private transient ElasticsearchClient client; private AccessConfig accessConfig; private ElasticsearchRetrieveLatestDao retrieveLatestDao; - private WriteRequest.RefreshPolicy refreshPolicy; + private ElasticsearchBulkDocumentWriter<Document> documentWriter; public ElasticsearchUpdateDao(ElasticsearchClient client, AccessConfig accessConfig, ElasticsearchRetrieveLatestDao searchDao) { - this.client = client; this.accessConfig = accessConfig; this.retrieveLatestDao = searchDao; - this.refreshPolicy = WriteRequest.RefreshPolicy.NONE; + this.documentWriter = new ElasticsearchBulkDocumentWriter<>(client) + .withRefreshPolicy(WriteRequest.RefreshPolicy.NONE); } @Override public Document update(Document update, Optional<String> index) throws IOException { - String indexPostfix = ElasticsearchUtils - .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date()); - String sensorType = update.getSensorType(); - String indexName = getIndexName(update, index, indexPostfix); - - IndexRequest indexRequest = buildIndexRequest(update, sensorType, indexName); - try { - IndexResponse response = client.getHighLevelClient().index(indexRequest); - - ShardInfo shardInfo = response.getShardInfo(); - int failed = shardInfo.getFailed(); - if (failed > 0) { - throw new IOException( - "ElasticsearchDao index failed: " + Arrays.toString(shardInfo.getFailures())); - } - } catch (Exception e) { - throw new IOException(e.getMessage(), e); - } - return update; + Map<Document, Optional<String>> updates = new HashMap<>(); + updates.put(update, index); + + Map<Document, Optional<String>> results = batchUpdate(updates); + return results.keySet().iterator().next(); } @Override public Map<Document, Optional<String>> batchUpdate(Map<Document, Optional<String>> updates) throws IOException { - String indexPostfix = ElasticsearchUtils - .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date()); - - BulkRequest bulkRequestBuilder = new BulkRequest(); - bulkRequestBuilder.setRefreshPolicy(refreshPolicy); - - // Get the indices we'll actually be using for each Document. - for (Map.Entry<Document, Optional<String>> updateEntry : updates.entrySet()) { - Document update = updateEntry.getKey(); - String sensorType = update.getSensorType(); - String indexName = getIndexName(update, updateEntry.getValue(), indexPostfix); - IndexRequest indexRequest = buildIndexRequest( - update, - sensorType, - indexName - ); - - bulkRequestBuilder.add(indexRequest); + Map<String, Object> globalConfig = accessConfig.getGlobalConfigSupplier().get(); + String indexPostfix = ElasticsearchUtils.getIndexFormat(globalConfig).format(new Date()); + + for (Map.Entry<Document, Optional<String>> entry : updates.entrySet()) { + Document document = entry.getKey(); + Optional<String> optionalIndex = entry.getValue(); + String indexName = optionalIndex.orElse(getIndexName(document, indexPostfix)); + documentWriter.addDocument(document, indexName); } - BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequestBuilder); - if (bulkResponse.hasFailures()) { - LOG.error("Bulk Request has failures: {}", bulkResponse.buildFailureMessage()); - throw new IOException( - "ElasticsearchDao upsert failed: " + bulkResponse.buildFailureMessage()); + // write the documents. if any document fails, raise an exception. + BulkDocumentWriterResults<Document> results = documentWriter.write(); + int failures = results.getFailures().size(); + if(failures > 0) { + int successes = results.getSuccesses().size(); + String msg = format("Failed to update all documents; %d successes, %d failures", successes, failures); + LOG.error(msg); + + // log each individual failure + for(WriteFailure<Document> failure: results.getFailures()) { + LOG.error(failure.getMessage(), failure.getCause()); + } + + // raise an exception using the first exception as the root cause, although there may be many + Throwable cause = results.getFailures().get(0).getCause(); + throw new IOException(msg, cause); } + return updates; } @@ -187,32 +174,19 @@ public class ElasticsearchUpdateDao implements UpdateDao { } public ElasticsearchUpdateDao withRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) { - this.refreshPolicy = refreshPolicy; + documentWriter.withRefreshPolicy(refreshPolicy); return this; } - protected String getIndexName(Document update, Optional<String> index, String indexPostFix) throws IOException { - return index.orElse(getIndexName(update.getGuid(), update.getSensorType()) - .orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), indexPostFix, null)) - ); + protected String getIndexName(Document update, String indexPostFix) throws IOException { + return findIndexNameByGUID(update.getGuid(), update.getSensorType()) + .orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), indexPostFix, null)); } - protected Optional<String> getIndexName(String guid, String sensorType) throws IOException { - return retrieveLatestDao.searchByGuid(guid, - sensorType, - hit -> Optional.ofNullable(hit.getIndex()) - ); - } - - protected IndexRequest buildIndexRequest(Document update, String sensorType, String indexName) { - String type = sensorType + "_doc"; - Object ts = update.getTimestamp(); - IndexRequest indexRequest = new IndexRequest(indexName, type, update.getGuid()) - .source(update.getDocument()); - if (ts != null) { - indexRequest = indexRequest.timestamp(ts.toString()); - } - - return indexRequest; + protected Optional<String> findIndexNameByGUID(String guid, String sensorType) throws IOException { + return retrieveLatestDao.searchByGuid( + guid, + sensorType, + hit -> Optional.ofNullable(hit.getIndex())); } } diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java index fbdd4fe..a3459d8 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java @@ -17,21 +17,24 @@ */ package org.apache.metron.elasticsearch.writer; +import com.google.common.collect.Lists; import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.field.FieldNameConverter; import org.apache.metron.common.field.FieldNameConverters; import org.apache.metron.common.writer.BulkMessageWriter; import org.apache.metron.common.writer.BulkWriterResponse; +import org.apache.metron.elasticsearch.bulk.BulkDocumentWriter; +import org.apache.metron.elasticsearch.bulk.ElasticsearchBulkDocumentWriter; +import org.apache.metron.elasticsearch.bulk.WriteFailure; +import org.apache.metron.elasticsearch.bulk.WriteSuccess; +import org.apache.metron.elasticsearch.bulk.BulkDocumentWriterResults; import org.apache.metron.elasticsearch.client.ElasticsearchClient; import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; +import org.apache.metron.stellar.common.utils.ConversionUtils; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,11 +42,14 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; import java.lang.invoke.MethodHandles; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Date; -import java.util.Iterator; import java.util.List; import java.util.Map; +import static java.lang.String.format; +import static org.apache.metron.stellar.common.Constants.Fields.TIMESTAMP; + /** * A {@link BulkMessageWriter} that writes messages to Elasticsearch. */ @@ -57,89 +63,110 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria private transient ElasticsearchClient client; /** + * Responsible for writing documents. + * + * <p>Uses a {@link TupleBasedDocument} to maintain the relationship between + * a {@link Tuple} and the document created from the contents of that tuple. If + * a document cannot be written, the associated tuple needs to be failed. + */ + private transient BulkDocumentWriter<TupleBasedDocument> documentWriter; + + /** * A simple data formatter used to build the appropriate Elasticsearch index name. */ private SimpleDateFormat dateFormat; - @Override public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) { - Map<String, Object> globalConfiguration = configurations.getGlobalConfig(); - client = ElasticsearchClientFactory.create(globalConfiguration); dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration); + + // only create the document writer, if one does not already exist. useful for testing. + if(documentWriter == null) { + client = ElasticsearchClientFactory.create(globalConfiguration); + documentWriter = new ElasticsearchBulkDocumentWriter<>(client); + } } @Override - public BulkWriterResponse write(String sensorType, WriterConfiguration configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws Exception { + public BulkWriterResponse write(String sensorType, + WriterConfiguration configurations, + Iterable<Tuple> tuplesIter, + List<JSONObject> messages) { // fetch the field name converter for this sensor type FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations); + String indexPostfix = dateFormat.format(new Date()); + String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations); + + // the number of tuples must match the number of messages + List<Tuple> tuples = Lists.newArrayList(tuplesIter); + int batchSize = tuples.size(); + if(messages.size() != batchSize) { + throw new IllegalStateException(format("Expect same number of tuples and messages; |tuples|=%d, |messages|=%d", + tuples.size(), messages.size())); + } - final String indexPostfix = dateFormat.format(new Date()); - BulkRequest bulkRequest = new BulkRequest(); - for(JSONObject message: messages) { - - JSONObject esDoc = new JSONObject(); - for(Object k : message.keySet()){ - copyField(k.toString(), message, esDoc, fieldNameConverter); - } - - String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations); - IndexRequest indexRequest = new IndexRequest(indexName, sensorType + "_doc"); - indexRequest.source(esDoc.toJSONString()); - String guid = (String)esDoc.get(Constants.GUID); - if(guid != null) { - indexRequest.id(guid); - } - - Object ts = esDoc.get("timestamp"); - if(ts != null) { - indexRequest.timestamp(ts.toString()); - } - bulkRequest.add(indexRequest); + // create a document from each message + for(int i=0; i<tuples.size(); i++) { + JSONObject message = messages.get(i); + Tuple tuple = tuples.get(i); + TupleBasedDocument document = createDocument(message, tuple, sensorType, fieldNameConverter); + documentWriter.addDocument(document, indexName); } - BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequest); - return buildWriteReponse(tuples, bulkResponse); - } + // write the documents + BulkDocumentWriterResults<TupleBasedDocument> results = documentWriter.write(); - @Override - public String getName() { - return "elasticsearch"; + // build the response + BulkWriterResponse response = new BulkWriterResponse(); + for(WriteSuccess<TupleBasedDocument> success: results.getSuccesses()) { + response.addSuccess(success.getDocument().getTuple()); + } + for(WriteFailure<TupleBasedDocument> failure: results.getFailures()) { + response.addError(failure.getCause(), failure.getDocument().getTuple()); + } + return response; } - protected BulkWriterResponse buildWriteReponse(Iterable<Tuple> tuples, BulkResponse bulkResponse) throws Exception { - // Elasticsearch responses are in the same order as the request, giving us an implicit mapping with Tuples - BulkWriterResponse writerResponse = new BulkWriterResponse(); - if (bulkResponse.hasFailures()) { - Iterator<BulkItemResponse> respIter = bulkResponse.iterator(); - Iterator<Tuple> tupleIter = tuples.iterator(); - while (respIter.hasNext() && tupleIter.hasNext()) { - BulkItemResponse item = respIter.next(); - Tuple tuple = tupleIter.next(); - - if (item.isFailed()) { - writerResponse.addError(item.getFailure().getCause(), tuple); - } else { - writerResponse.addSuccess(tuple); - } - - // Should never happen, so fail the entire batch if it does. - if (respIter.hasNext() != tupleIter.hasNext()) { - throw new Exception(bulkResponse.buildFailureMessage()); - } - } + private TupleBasedDocument createDocument(JSONObject message, + Tuple tuple, + String sensorType, + FieldNameConverter fieldNameConverter) { + // transform the message fields to the source fields of the indexed document + JSONObject source = new JSONObject(); + for(Object k : message.keySet()){ + copyField(k.toString(), message, source, fieldNameConverter); + } + + // define the document id + String guid = ConversionUtils.convert(source.get(Constants.GUID), String.class); + if(guid == null) { + LOG.warn("Missing '{}' field; document ID will be auto-generated.", Constants.GUID); + } + + // define the document timestamp + Long timestamp = null; + Object value = source.get(TIMESTAMP.getName()); + if(value != null) { + timestamp = Long.parseLong(value.toString()); } else { - writerResponse.addAllSuccesses(tuples); + LOG.warn("Missing '{}' field; timestamp will be set to system time.", TIMESTAMP.getName()); } - return writerResponse; + return new TupleBasedDocument(source, guid, sensorType, timestamp, tuple); + } + + @Override + public String getName() { + return "elasticsearch"; } @Override public void close() throws Exception { - client.close(); + if(client != null) { + client.close(); + } } /** @@ -167,5 +194,13 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria // copy the field destination.put(destinationFieldName, source.get(sourceFieldName)); } + + /** + * Set the document writer. Primarily used for testing. + * @param documentWriter The {@link BulkDocumentWriter} to use. + */ + public void setDocumentWriter(BulkDocumentWriter<TupleBasedDocument> documentWriter) { + this.documentWriter = documentWriter; + } } diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/TupleBasedDocument.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/TupleBasedDocument.java new file mode 100644 index 0000000..ba44937 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/TupleBasedDocument.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.elasticsearch.writer; + +import org.apache.metron.indexing.dao.update.Document; +import org.apache.storm.tuple.Tuple; + +import java.util.Map; + +/** + * An {@link Document} that is created from the contents of a {@link Tuple}. + */ +public class TupleBasedDocument extends Document { + + private Tuple tuple; + + public TupleBasedDocument(Map<String, Object> document, + String guid, + String sensorType, + Long timestamp, + Tuple tuple) { + super(document, guid, sensorType, timestamp); + this.tuple = tuple; + } + + public Tuple getTuple() { + return tuple; + } +} diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java new file mode 100644 index 0000000..b313811 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.elasticsearch.bulk; + +import org.apache.metron.common.Constants; +import org.apache.metron.elasticsearch.client.ElasticsearchClient; +import org.apache.metron.indexing.dao.update.Document; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.client.RestHighLevelClient; +import org.json.simple.JSONObject; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ElasticsearchBulkDocumentWriterTest { + + ElasticsearchBulkDocumentWriter<Document> writer; + ElasticsearchClient client; + RestHighLevelClient highLevelClient; + + @Before + public void setup() { + // mock Elasticsearch + highLevelClient = mock(RestHighLevelClient.class); + client = mock(ElasticsearchClient.class); + when(client.getHighLevelClient()).thenReturn(highLevelClient); + + writer = new ElasticsearchBulkDocumentWriter<>(client); + } + + @Test + public void testWriteSuccess() throws IOException { + setupElasticsearchToSucceed(); + + // write a document successfully + Document doc = document(message()); + String index = "bro_index"; + writer.addDocument(doc, index); + + BulkDocumentWriterResults<Document> results = writer.write(); + assertEquals(1, results.getSuccesses().size()); + assertEquals(0, results.getFailures().size()); + + WriteSuccess<Document> success = results.getSuccesses().get(0); + assertEquals(doc, success.getDocument()); + } + + @Test + public void testWriteFailure() throws IOException { + setupElasticsearchToFail(); + + // the document will fail to write + Document doc = document(message()); + String index = "bro_index"; + writer.addDocument(doc, index); + + BulkDocumentWriterResults<Document> results = writer.write(); + assertEquals(0, results.getSuccesses().size()); + assertEquals(1, results.getFailures().size()); + + WriteFailure<Document> failure = results.getFailures().get(0); + assertEquals(doc, failure.getDocument()); + assertEquals("error message", failure.getMessage()); + assertNotNull(failure.getCause()); + } + + @Test + public void testSizeWhenWriteSuccessful() throws IOException { + setupElasticsearchToSucceed(); + assertEquals(0, writer.size()); + + // add some documents to write + String index = "bro_index"; + writer.addDocument(document(message()), index); + writer.addDocument(document(message()), index); + writer.addDocument(document(message()), index); + writer.addDocument(document(message()), index); + writer.addDocument(document(message()), index); + assertEquals(5, writer.size()); + + // after the write, all documents should have been flushed + writer.write(); + assertEquals(0, writer.size()); + } + + @Test + public void testSizeWhenWriteFails() throws IOException { + setupElasticsearchToFail(); + assertEquals(0, writer.size()); + + // add some documents to write + String index = "bro_index"; + writer.addDocument(document(message()), index); + writer.addDocument(document(message()), index); + writer.addDocument(document(message()), index); + writer.addDocument(document(message()), index); + writer.addDocument(document(message()), index); + assertEquals(5, writer.size()); + + // after the write, all documents should have been flushed + writer.write(); + assertEquals(0, writer.size()); + } + + private void setupElasticsearchToFail() throws IOException { + // define the item failure + BulkItemResponse.Failure failure = mock(BulkItemResponse.Failure.class); + when(failure.getCause()).thenReturn(new Exception("test exception")); + when(failure.getMessage()).thenReturn("error message"); + + // define the item level response + BulkItemResponse itemResponse = mock(BulkItemResponse.class); + when(itemResponse.isFailed()).thenReturn(true); + when(itemResponse.getItemId()).thenReturn(0); + when(itemResponse.getFailure()).thenReturn(failure); + when(itemResponse.getFailureMessage()).thenReturn("error message"); + List<BulkItemResponse> itemsResponses = Collections.singletonList(itemResponse); + + // define the bulk response to indicate failure + BulkResponse response = mock(BulkResponse.class); + when(response.iterator()).thenReturn(itemsResponses.iterator()); + when(response.hasFailures()).thenReturn(true); + + // have the client return the mock response + when(highLevelClient.bulk(any(BulkRequest.class))).thenReturn(response); + } + + private void setupElasticsearchToSucceed() throws IOException { + // define the bulk response to indicate success + BulkResponse response = mock(BulkResponse.class); + when(response.hasFailures()).thenReturn(false); + + // have the client return the mock response + when(highLevelClient.bulk(any(BulkRequest.class))).thenReturn(response); + } + + private Document document(JSONObject message) { + String guid = UUID.randomUUID().toString(); + String sensorType = "bro"; + Long timestamp = System.currentTimeMillis(); + return new Document(message, guid, sensorType, timestamp); + } + + private JSONObject message() { + JSONObject message = new JSONObject(); + message.put(Constants.GUID, UUID.randomUUID().toString()); + message.put(Constants.Fields.TIMESTAMP.getName(), System.currentTimeMillis()); + message.put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1"); + return message; + } +} diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java index 227f5ef..dfdf88e 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java @@ -17,40 +17,11 @@ */ package org.apache.metron.elasticsearch.integration.components; -import static java.util.Arrays.asList; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.function.BooleanSupplier; - -import com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.io.FileUtils; -import org.apache.metron.common.Constants; -import org.apache.metron.common.utils.JSONUtils; -import org.apache.metron.elasticsearch.client.ElasticsearchClient; -import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory; -import org.apache.metron.elasticsearch.dao.ElasticsearchColumnMetadataDao; import org.apache.metron.elasticsearch.dao.ElasticsearchDao; -import org.apache.metron.elasticsearch.dao.ElasticsearchRequestSubmitter; -import org.apache.metron.elasticsearch.dao.ElasticsearchRetrieveLatestDao; -import org.apache.metron.elasticsearch.dao.ElasticsearchSearchDao; -import org.apache.metron.elasticsearch.dao.ElasticsearchUpdateDao; -import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.IndexDao; -import org.apache.metron.indexing.dao.search.InvalidSearchException; -import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.update.Document; -import org.apache.metron.indexing.dao.update.UpdateDao; import org.apache.metron.integration.InMemoryComponent; import org.apache.metron.integration.UnableToStartException; import org.apache.metron.stellar.common.utils.ConversionUtils; @@ -62,9 +33,6 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; @@ -76,11 +44,23 @@ import org.elasticsearch.node.NodeValidationException; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.SearchHit; import org.elasticsearch.transport.Netty4Plugin; -import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static java.util.Arrays.asList; + public class ElasticSearchComponent implements InMemoryComponent { private static class Mapping { diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java index 6a3638b..e5e85b0 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java @@ -18,170 +18,290 @@ package org.apache.metron.elasticsearch.writer; -import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.apache.metron.common.writer.BulkWriterResponse; +import org.apache.metron.elasticsearch.bulk.BulkDocumentWriter; +import org.apache.metron.elasticsearch.bulk.BulkDocumentWriterResults; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; +import org.json.simple.JSONObject; +import org.junit.Before; +import org.junit.Test; -import com.google.common.collect.ImmutableList; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; -import org.apache.metron.common.writer.BulkWriterResponse; -import org.apache.storm.tuple.Tuple; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkResponse; -import org.junit.Test; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class ElasticsearchWriterTest { - @Test - public void testSingleSuccesses() throws Exception { - Tuple tuple1 = mock(Tuple.class); - BulkResponse response = mock(BulkResponse.class); - when(response.hasFailures()).thenReturn(false); + Map stormConf; + TopologyContext topologyContext; + WriterConfiguration writerConfiguration; - BulkWriterResponse expected = new BulkWriterResponse(); - expected.addSuccess(tuple1); + @Before + public void setup() { + topologyContext = mock(TopologyContext.class); - ElasticsearchWriter esWriter = new ElasticsearchWriter(); - BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1), response); + writerConfiguration = mock(WriterConfiguration.class); + when(writerConfiguration.getGlobalConfig()).thenReturn(globals()); - assertEquals("Response should have no errors and single success", expected, actual); + stormConf = new HashMap(); } @Test - public void testMultipleSuccesses() throws Exception { - Tuple tuple1 = mock(Tuple.class); - Tuple tuple2 = mock(Tuple.class); - - BulkResponse response = mock(BulkResponse.class); - when(response.hasFailures()).thenReturn(false); - - BulkWriterResponse expected = new BulkWriterResponse(); - expected.addSuccess(tuple1); - expected.addSuccess(tuple2); - + public void shouldWriteSuccessfully() { + // create a tuple and a message associated with that tuple + List<Tuple> tuples = createTuples(1); + List<JSONObject> messages = createMessages(1); + + // create a document writer which will successfully write all + BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>(); + results.addSuccess(createDocument(messages.get(0), tuples.get(0))); + BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class); + when(docWriter.write()).thenReturn(results); + + // attempt to write ElasticsearchWriter esWriter = new ElasticsearchWriter(); - BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1, tuple2), response); + esWriter.setDocumentWriter(docWriter); + esWriter.init(stormConf, topologyContext, writerConfiguration); + BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages); - assertEquals("Response should have no errors and two successes", expected, actual); + // response should only contain successes + assertFalse(response.hasErrors()); + assertTrue(response.getSuccesses().contains(tuples.get(0))); } @Test - public void testSingleFailure() throws Exception { - Tuple tuple1 = mock(Tuple.class); - - BulkResponse response = mock(BulkResponse.class); - when(response.hasFailures()).thenReturn(true); - - Exception e = new IllegalStateException(); - BulkItemResponse itemResponse = buildBulkItemFailure(e); - when(response.iterator()).thenReturn(ImmutableList.of(itemResponse).iterator()); - - BulkWriterResponse expected = new BulkWriterResponse(); - expected.addError(e, tuple1); - + public void shouldWriteManySuccessfully() { + // create a few tuples and the messages associated with the tuples + List<Tuple> tuples = createTuples(3); + List<JSONObject> messages = createMessages(3); + + // create a document writer which will successfully write all + BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>(); + results.addSuccess(createDocument(messages.get(0), tuples.get(0))); + results.addSuccess(createDocument(messages.get(1), tuples.get(1))); + results.addSuccess(createDocument(messages.get(2), tuples.get(2))); + BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class); + when(docWriter.write()).thenReturn(results); + + // attempt to write ElasticsearchWriter esWriter = new ElasticsearchWriter(); - BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1), response); - - assertEquals("Response should have one error and zero successes", expected, actual); + esWriter.setDocumentWriter(docWriter); + esWriter.init(stormConf, topologyContext, writerConfiguration); + BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages); + + // response should only contain successes + assertFalse(response.hasErrors()); + assertTrue(response.getSuccesses().contains(tuples.get(0))); + assertTrue(response.getSuccesses().contains(tuples.get(1))); + assertTrue(response.getSuccesses().contains(tuples.get(2))); } @Test - public void testTwoSameFailure() throws Exception { - Tuple tuple1 = mock(Tuple.class); - Tuple tuple2 = mock(Tuple.class); - - BulkResponse response = mock(BulkResponse.class); - when(response.hasFailures()).thenReturn(true); - - Exception e = new IllegalStateException(); - - BulkItemResponse itemResponse = buildBulkItemFailure(e); - BulkItemResponse itemResponse2 = buildBulkItemFailure(e); - - when(response.iterator()).thenReturn(ImmutableList.of(itemResponse, itemResponse2).iterator()); - - BulkWriterResponse expected = new BulkWriterResponse(); - expected.addError(e, tuple1); - expected.addError(e, tuple2); - + public void shouldHandleWriteFailure() { + // create a tuple and a message associated with that tuple + List<Tuple> tuples = createTuples(1); + List<JSONObject> messages = createMessages(1); + Exception cause = new Exception(); + + // create a document writer which will fail all writes + BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>(); + results.addFailure(createDocument(messages.get(0), tuples.get(0)), cause, "error"); + BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class); + when(docWriter.write()).thenReturn(results); + + // attempt to write ElasticsearchWriter esWriter = new ElasticsearchWriter(); - BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1, tuple2), response); - - assertEquals("Response should have two errors and no successes", expected, actual); - - // Ensure the errors actually get collapsed together - Map<Throwable, Collection<Tuple>> actualErrors = actual.getErrors(); - HashMap<Throwable, Collection<Tuple>> expectedErrors = new HashMap<>(); - expectedErrors.put(e, ImmutableList.of(tuple1, tuple2)); - assertEquals("Errors should have collapsed together", expectedErrors, actualErrors); + esWriter.setDocumentWriter(docWriter); + esWriter.init(stormConf, topologyContext, writerConfiguration); + BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages); + + // the writer response should only contain failures + assertEquals(0, response.getSuccesses().size()); + assertEquals(1, response.getErrors().size()); + Collection<Tuple> errors = response.getErrors().get(cause); + assertTrue(errors.contains(tuples.get(0))); } @Test - public void testTwoDifferentFailure() throws Exception { - Tuple tuple1 = mock(Tuple.class); - Tuple tuple2 = mock(Tuple.class); + public void shouldHandleManyWriteFailures() { + // create a few tuples and the messages associated with the tuples + int count = 3; + List<Tuple> tuples = createTuples(count); + List<JSONObject> messages = createMessages(count); + Exception cause = new Exception(); + + // create a document writer which will fail all writes + BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>(); + results.addFailure(createDocument(messages.get(0), tuples.get(0)), cause, "error"); + results.addFailure(createDocument(messages.get(1), tuples.get(1)), cause, "error"); + results.addFailure(createDocument(messages.get(2), tuples.get(2)), cause, "error"); + BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class); + when(docWriter.write()).thenReturn(results); + + // attempt to write + ElasticsearchWriter esWriter = new ElasticsearchWriter(); + esWriter.setDocumentWriter(docWriter); + esWriter.init(stormConf, topologyContext, writerConfiguration); + BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages); + + // the writer response should only contain failures + assertEquals(0, response.getSuccesses().size()); + assertEquals(1, response.getErrors().size()); + Collection<Tuple> errors = response.getErrors().get(cause); + assertTrue(errors.contains(tuples.get(0))); + assertTrue(errors.contains(tuples.get(1))); + assertTrue(errors.contains(tuples.get(2))); + } - BulkResponse response = mock(BulkResponse.class); - when(response.hasFailures()).thenReturn(true); + @Test + public void shouldHandlePartialFailures() { + // create a few tuples and the messages associated with the tuples + int count = 2; + List<Tuple> tuples = createTuples(count); + List<JSONObject> messages = createMessages(count); + Exception cause = new Exception(); + + // create a document writer that will fail one and succeed the other + BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>(); + results.addFailure(createDocument(messages.get(0), tuples.get(0)), cause, "error"); + results.addSuccess(createDocument(messages.get(1), tuples.get(1))); + BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class); + when(docWriter.write()).thenReturn(results); + + // attempt to write + ElasticsearchWriter esWriter = new ElasticsearchWriter(); + esWriter.setDocumentWriter(docWriter); + esWriter.init(stormConf, topologyContext, writerConfiguration); + BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages); + + // response should contain some successes and some failures + assertEquals(1, response.getSuccesses().size()); + assertEquals(1, response.getErrors().size()); + assertTrue(response.getErrors().get(cause).contains(tuples.get(0))); + assertTrue(response.getSuccesses().contains(tuples.get(1))); + } - Exception e = new IllegalStateException("Cause"); - Exception e2 = new IllegalStateException("Different Cause"); - BulkItemResponse itemResponse = buildBulkItemFailure(e); - BulkItemResponse itemResponse2 = buildBulkItemFailure(e2); + @Test(expected = IllegalStateException.class) + public void shouldCheckIfNumberOfMessagesMatchNumberOfTuples() { + ElasticsearchWriter esWriter = new ElasticsearchWriter(); + esWriter.setDocumentWriter(mock(BulkDocumentWriter.class)); + esWriter.init(stormConf, topologyContext, writerConfiguration); - when(response.iterator()).thenReturn(ImmutableList.of(itemResponse, itemResponse2).iterator()); + // there are 5 tuples and only 1 message; there should be 5 messages to match the number of tuples + List<Tuple> tuples = createTuples(5); + List<JSONObject> messages = createMessages(1); - BulkWriterResponse expected = new BulkWriterResponse(); - expected.addError(e, tuple1); - expected.addError(e2, tuple2); + esWriter.write("bro", writerConfiguration, tuples, messages); + fail("expected exception"); + } + @Test + public void shouldWriteSuccessfullyWhenMessageTimestampIsString() { + List<Tuple> tuples = createTuples(1); + List<JSONObject> messages = createMessages(1); + + // the timestamp is a String, rather than a Long + messages.get(0).put(Constants.Fields.TIMESTAMP.getName(), new Long(System.currentTimeMillis()).toString()); + + // create the document + JSONObject message = messages.get(0); + String timestamp = (String) message.get(Constants.Fields.TIMESTAMP.getName()); + String guid = (String) message.get(Constants.GUID); + String sensorType = (String) message.get(Constants.SENSOR_TYPE); + TupleBasedDocument document = new TupleBasedDocument(message, guid, sensorType, Long.parseLong(timestamp), tuples.get(0)); + + // create a document writer which will successfully write that document + BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>(); + results.addSuccess(document); + BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class); + when(docWriter.write()).thenReturn(results); + + // attempt to write ElasticsearchWriter esWriter = new ElasticsearchWriter(); - BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1, tuple2), response); + esWriter.setDocumentWriter(docWriter); + esWriter.init(stormConf, topologyContext, writerConfiguration); + BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages); - assertEquals("Response should have two errors and no successes", expected, actual); - - // Ensure the errors did not get collapsed together - Map<Throwable, Collection<Tuple>> actualErrors = actual.getErrors(); - HashMap<Throwable, Collection<Tuple>> expectedErrors = new HashMap<>(); - expectedErrors.put(e, ImmutableList.of(tuple1)); - expectedErrors.put(e2, ImmutableList.of(tuple2)); - assertEquals("Errors should not have collapsed together", expectedErrors, actualErrors); + // response should only contain successes + assertFalse(response.hasErrors()); + assertTrue(response.getSuccesses().contains(tuples.get(0))); } @Test - public void testSuccessAndFailure() throws Exception { - Tuple tuple1 = mock(Tuple.class); - Tuple tuple2 = mock(Tuple.class); + public void shouldWriteSuccessfullyWhenMissingGUID() { + // create a tuple and a message associated with that tuple + List<Tuple> tuples = createTuples(1); + List<JSONObject> messages = createMessages(1); - BulkResponse response = mock(BulkResponse.class); - when(response.hasFailures()).thenReturn(true); + // remove the GUID from the message + assertNotNull(messages.get(0).remove(Constants.GUID)); - Exception e = new IllegalStateException("Cause"); - BulkItemResponse itemResponse = buildBulkItemFailure(e); + // create a document writer which will successfully write all + BulkDocumentWriterResults<TupleBasedDocument> results = new BulkDocumentWriterResults<>(); + results.addSuccess(createDocument(messages.get(0), tuples.get(0))); + BulkDocumentWriter<TupleBasedDocument> docWriter = mock(BulkDocumentWriter.class); + when(docWriter.write()).thenReturn(results); - BulkItemResponse itemResponse2 = mock(BulkItemResponse.class); - when(itemResponse2.isFailed()).thenReturn(false); + // attempt to write + ElasticsearchWriter esWriter = new ElasticsearchWriter(); + esWriter.setDocumentWriter(docWriter); + esWriter.init(stormConf, topologyContext, writerConfiguration); + BulkWriterResponse response = esWriter.write("bro", writerConfiguration, tuples, messages); - when(response.iterator()).thenReturn(ImmutableList.of(itemResponse, itemResponse2).iterator()); + // response should only contain successes + assertFalse(response.hasErrors()); + assertTrue(response.getSuccesses().contains(tuples.get(0))); + } - BulkWriterResponse expected = new BulkWriterResponse(); - expected.addError(e, tuple1); - expected.addSuccess(tuple2); + private TupleBasedDocument createDocument(JSONObject message, Tuple tuple) { + Long timestamp = (Long) message.get(Constants.Fields.TIMESTAMP.getName()); + String guid = (String) message.get(Constants.GUID); + String sensorType = (String) message.get(Constants.SENSOR_TYPE); + return new TupleBasedDocument(message, guid, sensorType, timestamp, tuple); + } - ElasticsearchWriter esWriter = new ElasticsearchWriter(); - BulkWriterResponse actual = esWriter.buildWriteReponse(ImmutableList.of(tuple1, tuple2), response); + private JSONObject message() { + JSONObject message = new JSONObject(); + message.put(Constants.GUID, UUID.randomUUID().toString()); + message.put(Constants.Fields.TIMESTAMP.getName(), System.currentTimeMillis()); + message.put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1"); + message.put(Constants.SENSOR_TYPE, "sensor"); + return message; + } + + private Map<String, Object> globals() { + Map<String, Object> globals = new HashMap<>(); + globals.put("es.date.format", "yyyy.MM.dd.HH"); + return globals; + } - assertEquals("Response should have one error and one success", expected, actual); + private List<Tuple> createTuples(int count) { + List<Tuple> tuples = new ArrayList<>(); + for(int i=0; i<count; i++) { + tuples.add(mock(Tuple.class)); + } + return tuples; } - private BulkItemResponse buildBulkItemFailure(Exception e) { - BulkItemResponse itemResponse = mock(BulkItemResponse.class); - when(itemResponse.isFailed()).thenReturn(true); - BulkItemResponse.Failure failure = mock(BulkItemResponse.Failure.class); - when(itemResponse.getFailure()).thenReturn(failure); - when(failure.getCause()).thenReturn(e); - return itemResponse; + private List<JSONObject> createMessages(int count) { + List<JSONObject> messages = new ArrayList<>(); + for(int i=0; i<count; i++) { + messages.add(message()); + } + return messages; } } diff --git a/metron-platform/metron-indexing/src/test/resources/log4j.properties b/metron-platform/metron-indexing/src/test/resources/log4j.properties new file mode 100644 index 0000000..e69de29