nickwallen commented on a change in pull request #1254: METRON-1849 Elasticsearch Index Write Functionality Should be Shared URL: https://github.com/apache/metron/pull/1254#discussion_r240605171
########## File path: metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java ########## @@ -17,103 +17,90 @@ */ package org.apache.metron.elasticsearch.dao; -import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD; - -import java.io.IOException; -import java.lang.invoke.MethodHandles; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Collectors; - +import org.apache.metron.elasticsearch.bulk.ElasticsearchBulkDocumentWriter; +import org.apache.metron.elasticsearch.bulk.WriteFailure; +import org.apache.metron.elasticsearch.bulk.BulkDocumentWriterResults; import org.apache.metron.elasticsearch.client.ElasticsearchClient; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.search.AlertComment; import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest; import org.apache.metron.indexing.dao.update.Document; import org.apache.metron.indexing.dao.update.UpdateDao; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD; + +import static java.lang.String.format; + public class ElasticsearchUpdateDao implements UpdateDao { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private transient ElasticsearchClient client; private AccessConfig accessConfig; private ElasticsearchRetrieveLatestDao retrieveLatestDao; - private WriteRequest.RefreshPolicy refreshPolicy; + private ElasticsearchBulkDocumentWriter<Document> documentWriter; public ElasticsearchUpdateDao(ElasticsearchClient client, AccessConfig accessConfig, ElasticsearchRetrieveLatestDao searchDao) { - this.client = client; this.accessConfig = accessConfig; this.retrieveLatestDao = searchDao; - this.refreshPolicy = WriteRequest.RefreshPolicy.NONE; + this.documentWriter = new ElasticsearchBulkDocumentWriter<>(client) + .withRefreshPolicy(WriteRequest.RefreshPolicy.NONE); } @Override public Document update(Document update, Optional<String> index) throws IOException { - String indexPostfix = ElasticsearchUtils - .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date()); - String sensorType = update.getSensorType(); - String indexName = getIndexName(update, index, indexPostfix); - - IndexRequest indexRequest = buildIndexRequest(update, sensorType, indexName); - try { - IndexResponse response = client.getHighLevelClient().index(indexRequest); - - ShardInfo shardInfo = response.getShardInfo(); - int failed = shardInfo.getFailed(); - if (failed > 0) { - throw new IOException( - "ElasticsearchDao index failed: " + Arrays.toString(shardInfo.getFailures())); - } - } catch (Exception e) { - throw new IOException(e.getMessage(), e); - } - return update; + Map<Document, Optional<String>> updates = new HashMap<>(); + updates.put(update, index); + + Map<Document, Optional<String>> results = batchUpdate(updates); + return results.keySet().iterator().next(); } @Override public Map<Document, Optional<String>> batchUpdate(Map<Document, Optional<String>> updates) throws IOException { - String indexPostfix = ElasticsearchUtils - .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date()); - - BulkRequest bulkRequestBuilder = new BulkRequest(); - bulkRequestBuilder.setRefreshPolicy(refreshPolicy); - - // Get the indices we'll actually be using for each Document. - for (Map.Entry<Document, Optional<String>> updateEntry : updates.entrySet()) { - Document update = updateEntry.getKey(); - String sensorType = update.getSensorType(); - String indexName = getIndexName(update, updateEntry.getValue(), indexPostfix); - IndexRequest indexRequest = buildIndexRequest( - update, - sensorType, - indexName - ); - - bulkRequestBuilder.add(indexRequest); + Map<String, Object> globalConfig = accessConfig.getGlobalConfigSupplier().get(); + String indexPostfix = ElasticsearchUtils.getIndexFormat(globalConfig).format(new Date()); + + for (Map.Entry<Document, Optional<String>> entry : updates.entrySet()) { + Document document = entry.getKey(); + Optional<String> optionalIndex = entry.getValue(); + String indexName = optionalIndex.orElse(getIndexName(document, indexPostfix)); + documentWriter.addDocument(document, indexName); } - BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequestBuilder); - if (bulkResponse.hasFailures()) { - LOG.error("Bulk Request has failures: {}", bulkResponse.buildFailureMessage()); - throw new IOException( - "ElasticsearchDao upsert failed: " + bulkResponse.buildFailureMessage()); + // write the documents. if any document fails, raise an exception. + BulkDocumentWriterResults<Document> results = documentWriter.write(); + int failures = results.getFailures().size(); + if(failures > 0) { + int successes = results.getSuccesses().size(); + String msg = format("Failed to update all documents; %d successes, %d failures", successes, failures); + LOG.error(msg); + + // log each individual failure + for(WriteFailure<Document> failure: results.getFailures()) { + LOG.error(failure.getMessage(), failure.getCause()); + } + + // raise an exception + Throwable cause = results.getFailures().get(0).getCause(); Review comment: The current behavior is for the `ElasticsearchUpdateDao` to throw an exception if any of the writes fail. Inside this if statement, we know there is at least 1 exception and possibly many more. I need to choose one of those exceptions as the root cause, so I just use the first. Make sense? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services