nickwallen commented on a change in pull request #1254: METRON-1849 
Elasticsearch Index Write Functionality Should be Shared
URL: https://github.com/apache/metron/pull/1254#discussion_r240605171
 
 

 ##########
 File path: 
metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
 ##########
 @@ -17,103 +17,90 @@
  */
 package org.apache.metron.elasticsearch.dao;
 
-import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
+import org.apache.metron.elasticsearch.bulk.ElasticsearchBulkDocumentWriter;
+import org.apache.metron.elasticsearch.bulk.WriteFailure;
+import org.apache.metron.elasticsearch.bulk.BulkDocumentWriterResults;
 import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.search.AlertComment;
 import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
 import org.apache.metron.indexing.dao.update.Document;
 import org.apache.metron.indexing.dao.update.UpdateDao;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.support.WriteRequest;
-import 
org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD;
+
+import static java.lang.String.format;
+
 public class ElasticsearchUpdateDao implements UpdateDao {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private transient ElasticsearchClient client;
   private AccessConfig accessConfig;
   private ElasticsearchRetrieveLatestDao retrieveLatestDao;
-  private WriteRequest.RefreshPolicy refreshPolicy;
+  private ElasticsearchBulkDocumentWriter<Document> documentWriter;
 
   public ElasticsearchUpdateDao(ElasticsearchClient client,
       AccessConfig accessConfig,
       ElasticsearchRetrieveLatestDao searchDao) {
-    this.client = client;
     this.accessConfig = accessConfig;
     this.retrieveLatestDao = searchDao;
-    this.refreshPolicy = WriteRequest.RefreshPolicy.NONE;
+    this.documentWriter = new ElasticsearchBulkDocumentWriter<>(client)
+            .withRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
   }
 
   @Override
   public Document update(Document update, Optional<String> index) throws 
IOException {
-    String indexPostfix = ElasticsearchUtils
-        
.getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new 
Date());
-    String sensorType = update.getSensorType();
-    String indexName = getIndexName(update, index, indexPostfix);
-
-    IndexRequest indexRequest = buildIndexRequest(update, sensorType, 
indexName);
-    try {
-      IndexResponse response = client.getHighLevelClient().index(indexRequest);
-
-      ShardInfo shardInfo = response.getShardInfo();
-      int failed = shardInfo.getFailed();
-      if (failed > 0) {
-        throw new IOException(
-            "ElasticsearchDao index failed: " + 
Arrays.toString(shardInfo.getFailures()));
-      }
-    } catch (Exception e) {
-      throw new IOException(e.getMessage(), e);
-    }
-    return update;
+    Map<Document, Optional<String>> updates = new HashMap<>();
+    updates.put(update, index);
+
+    Map<Document, Optional<String>> results = batchUpdate(updates);
+    return results.keySet().iterator().next();
   }
 
   @Override
   public Map<Document, Optional<String>> batchUpdate(Map<Document, 
Optional<String>> updates) throws IOException {
-    String indexPostfix = ElasticsearchUtils
-        
.getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new 
Date());
-
-    BulkRequest bulkRequestBuilder = new BulkRequest();
-    bulkRequestBuilder.setRefreshPolicy(refreshPolicy);
-
-    // Get the indices we'll actually be using for each Document.
-    for (Map.Entry<Document, Optional<String>> updateEntry : 
updates.entrySet()) {
-      Document update = updateEntry.getKey();
-      String sensorType = update.getSensorType();
-      String indexName = getIndexName(update, updateEntry.getValue(), 
indexPostfix);
-      IndexRequest indexRequest = buildIndexRequest(
-          update,
-          sensorType,
-          indexName
-      );
-
-      bulkRequestBuilder.add(indexRequest);
+    Map<String, Object> globalConfig = 
accessConfig.getGlobalConfigSupplier().get();
+    String indexPostfix = 
ElasticsearchUtils.getIndexFormat(globalConfig).format(new Date());
+
+    for (Map.Entry<Document, Optional<String>> entry : updates.entrySet()) {
+      Document document = entry.getKey();
+      Optional<String> optionalIndex = entry.getValue();
+      String indexName = optionalIndex.orElse(getIndexName(document, 
indexPostfix));
+      documentWriter.addDocument(document, indexName);
     }
 
-    BulkResponse bulkResponse = 
client.getHighLevelClient().bulk(bulkRequestBuilder);
-    if (bulkResponse.hasFailures()) {
-      LOG.error("Bulk Request has failures: {}", 
bulkResponse.buildFailureMessage());
-      throw new IOException(
-          "ElasticsearchDao upsert failed: " + 
bulkResponse.buildFailureMessage());
+    // write the documents. if any document fails, raise an exception.
+    BulkDocumentWriterResults<Document> results = documentWriter.write();
+    int failures = results.getFailures().size();
+    if(failures > 0) {
+      int successes = results.getSuccesses().size();
+      String msg = format("Failed to update all documents; %d successes, %d 
failures", successes, failures);
+      LOG.error(msg);
+
+      // log each individual failure
+      for(WriteFailure<Document> failure: results.getFailures()) {
+        LOG.error(failure.getMessage(), failure.getCause());
+      }
+
+      // raise an exception
+      Throwable cause = results.getFailures().get(0).getCause();
 
 Review comment:
   The current behavior is for the `ElasticsearchUpdateDao` to throw an 
exception if any of the writes fail.  Inside this if statement, we know there 
is at least 1 exception and possibly many more.  I need to choose one of those 
exceptions as the root cause, so I just use the first. Make sense?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to