asfgit closed pull request #1254: METRON-1849 Elasticsearch Index Write 
Functionality Should be Shared
URL: https://github.com/apache/metron/pull/1254
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriter.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriter.java
new file mode 100644
index 0000000000..34f543ec88
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriter.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.bulk;
+
+import org.apache.metron.indexing.dao.update.Document;
+
+/**
+ * Writes documents to an index in bulk.
+ *
+ * @param <D> The type of document to write.
+ */
+public interface BulkDocumentWriter<D extends Document> {
+
+    /**
+     * Add a document to the batch.
+     * @param document The document to write.
+     * @param index The name of the index to write to.
+     */
+    void addDocument(D document, String index);
+
+    /**
+     * @return The number of documents waiting to be written.
+     */
+    int size();
+
+    /**
+     * Write all documents in the batch.
+     */
+    BulkDocumentWriterResults<D> write();
+}
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriterResults.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriterResults.java
new file mode 100644
index 0000000000..90e5ce31f1
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/BulkDocumentWriterResults.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.bulk;
+
+import org.apache.metron.indexing.dao.update.Document;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The result of writing documents in bulk using a {@link BulkDocumentWriter}.
+ * @param <D> The type of documents to write.
+ */
+public class BulkDocumentWriterResults<D extends Document> {
+
+  private List<WriteSuccess<D>> successes;
+  private List<WriteFailure<D>> failures;
+
+  public BulkDocumentWriterResults() {
+    this.successes = new ArrayList<>();
+    this.failures = new ArrayList<>();
+  }
+
+  public void add(WriteSuccess<D> success) {
+    this.successes.add(success);
+  }
+
+  public void addSuccess(D success) {
+    add(new WriteSuccess<D>(success));
+  }
+
+  public void addSuccesses(List<D> successes) {
+    for(D success: successes) {
+      addSuccess(success);
+    }
+  }
+
+  public List<WriteSuccess<D>> getSuccesses() {
+    return successes;
+  }
+
+  public void add(WriteFailure<D> failure) {
+    this.failures.add(failure);
+  }
+
+  public void addFailure(D document, Throwable cause, String message) {
+    add(new WriteFailure(document, cause, message));
+  }
+
+  public List<WriteFailure<D>> getFailures() {
+    return failures;
+  }
+}
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java
new file mode 100644
index 0000000000..9e6e568ea7
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriter.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.bulk;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.WriteRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Writes documents to an Elasticsearch index in bulk.
+ *
+ * @param <D> The type of document to write.
+ */
+public class ElasticsearchBulkDocumentWriter<D extends Document> implements 
BulkDocumentWriter<D> {
+
+    /**
+     * A {@link Document} along with the index it will be written to.
+     */
+    private class Indexable {
+        D document;
+        String index;
+
+        public Indexable(D document, String index) {
+            this.document = document;
+            this.index = index;
+        }
+    }
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+    private ElasticsearchClient client;
+    private List<Indexable> documents;
+    private WriteRequest.RefreshPolicy refreshPolicy;
+
+    public ElasticsearchBulkDocumentWriter(ElasticsearchClient client) {
+        this.client = client;
+        this.documents = new ArrayList<>();
+        this.refreshPolicy = WriteRequest.RefreshPolicy.NONE;
+    }
+
+    @Override
+    public void addDocument(D document, String indexName) {
+        documents.add(new Indexable(document, indexName));
+        LOG.debug("Adding document to batch; document={}, index={}", document, 
indexName);
+    }
+
+    @Override
+    public BulkDocumentWriterResults<D> write() {
+        BulkDocumentWriterResults<D> results = new 
BulkDocumentWriterResults<>();
+        try {
+            // create an index request for each document
+            BulkRequest bulkRequest = new BulkRequest();
+            bulkRequest.setRefreshPolicy(refreshPolicy);
+            for(Indexable doc: documents) {
+                DocWriteRequest request = createRequest(doc.document, 
doc.index);
+                bulkRequest.add(request);
+            }
+
+            // submit the request and handle the response
+            BulkResponse bulkResponse = 
client.getHighLevelClient().bulk(bulkRequest);
+            handleBulkResponse(bulkResponse, documents, results);
+
+        } catch(IOException e) {
+            // assume all documents have failed
+            for(Indexable indexable: documents) {
+                D failed = indexable.document;
+                results.addFailure(failed, e, 
ExceptionUtils.getRootCauseMessage(e));
+            }
+            LOG.error("Failed to submit bulk request; all documents failed", 
e);
+
+        } finally {
+            // flush all documents no matter which ones succeeded or failed
+            documents.clear();
+        }
+
+        LOG.debug("Wrote document(s) to Elasticsearch; batchSize={}, 
success={}, failed={}",
+                documents.size(), results.getSuccesses().size(), 
results.getFailures().size());
+        return results;
+    }
+
+    @Override
+    public int size() {
+        return documents.size();
+    }
+
+    public ElasticsearchBulkDocumentWriter<D> 
withRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) {
+        this.refreshPolicy = refreshPolicy;
+        return this;
+    }
+
+    private IndexRequest createRequest(D document, String index) {
+        if(document.getTimestamp() == null) {
+            throw new IllegalArgumentException("Document must contain the 
timestamp");
+        }
+        return new IndexRequest()
+                .source(document.getDocument())
+                .type(document.getSensorType() + "_doc")
+                .id(document.getGuid())
+                .index(index)
+                .timestamp(document.getTimestamp().toString());
+    }
+
+    /**
+     * Handles the {@link BulkResponse} received from Elasticsearch.
+     * @param bulkResponse The response received from Elasticsearch.
+     * @param documents The documents included in the bulk request.
+     * @param results The writer results.
+     */
+    private void handleBulkResponse(BulkResponse bulkResponse, List<Indexable> 
documents, BulkDocumentWriterResults<D> results) {
+        if (bulkResponse.hasFailures()) {
+
+            // interrogate the response to distinguish between those that 
succeeded and those that failed
+            for(BulkItemResponse response: bulkResponse) {
+                if(response.isFailed()) {
+                    // request failed
+                    D failed = getDocument(response.getItemId());
+                    Exception cause = response.getFailure().getCause();
+                    String message = response.getFailureMessage();
+                    results.addFailure(failed, cause, message);
+
+                } else {
+                    // request succeeded
+                    D success = getDocument(response.getItemId());
+                    results.addSuccess(success);
+                }
+            }
+        } else {
+            // all requests succeeded
+            for(Indexable success: documents) {
+                results.addSuccess(success.document);
+            }
+        }
+    }
+
+    private D getDocument(int index) {
+        return documents.get(index).document;
+    }
+}
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteFailure.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteFailure.java
new file mode 100644
index 0000000000..ac571c7c04
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteFailure.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.bulk;
+
+import org.apache.metron.indexing.dao.update.Document;
+
+/**
+ * Indicates that a document failed to be written by a {@link 
BulkDocumentWriter}.
+ * @param <D> The type of document that failed to write.
+ */
+public class WriteFailure <D extends Document> {
+  private D document;
+  private Throwable cause;
+  private String message;
+
+  public WriteFailure(D document, Throwable cause, String message) {
+    this.document = document;
+    this.cause = cause;
+    this.message = message;
+  }
+
+  public D getDocument() {
+    return document;
+  }
+
+  public Throwable getCause() {
+    return cause;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+}
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteSuccess.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteSuccess.java
new file mode 100644
index 0000000000..a86325d297
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/WriteSuccess.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.bulk;
+
+import org.apache.metron.indexing.dao.update.Document;
+
+/**
+ * Indicates that a document was successfully written by a {@link 
BulkDocumentWriter}.
+ * @param <D> The type of document written.
+ */
+public class WriteSuccess <D extends Document> {
+  private D document;
+
+  public WriteSuccess(D document) {
+    this.document = document;
+  }
+
+  public D getDocument() {
+    return document;
+  }
+}
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index 675d22f190..7226c30e5a 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -191,7 +191,7 @@ public ElasticsearchDao 
withRefreshPolicy(WriteRequest.RefreshPolicy refreshPoli
   }
 
   protected Optional<String> getIndexName(String guid, String sensorType) 
throws IOException {
-    return updateDao.getIndexName(guid, sensorType);
+    return updateDao.findIndexNameByGUID(guid, sensorType);
   }
 
   protected SearchResponse search(SearchRequest request, QueryBuilder 
queryBuilder)
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
index ba852aaa0f..fa02f8df4d 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
@@ -17,18 +17,9 @@
  */
 package org.apache.metron.elasticsearch.dao;
 
-import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD;
-
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
+import org.apache.metron.elasticsearch.bulk.ElasticsearchBulkDocumentWriter;
+import org.apache.metron.elasticsearch.bulk.WriteFailure;
+import org.apache.metron.elasticsearch.bulk.BulkDocumentWriterResults;
 import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
@@ -36,84 +27,80 @@
 import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
 import org.apache.metron.indexing.dao.update.Document;
 import org.apache.metron.indexing.dao.update.UpdateDao;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.support.WriteRequest;
-import 
org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD;
+
+import static java.lang.String.format;
+
 public class ElasticsearchUpdateDao implements UpdateDao {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private transient ElasticsearchClient client;
   private AccessConfig accessConfig;
   private ElasticsearchRetrieveLatestDao retrieveLatestDao;
-  private WriteRequest.RefreshPolicy refreshPolicy;
+  private ElasticsearchBulkDocumentWriter<Document> documentWriter;
 
   public ElasticsearchUpdateDao(ElasticsearchClient client,
       AccessConfig accessConfig,
       ElasticsearchRetrieveLatestDao searchDao) {
-    this.client = client;
     this.accessConfig = accessConfig;
     this.retrieveLatestDao = searchDao;
-    this.refreshPolicy = WriteRequest.RefreshPolicy.NONE;
+    this.documentWriter = new ElasticsearchBulkDocumentWriter<>(client)
+            .withRefreshPolicy(WriteRequest.RefreshPolicy.NONE);
   }
 
   @Override
   public Document update(Document update, Optional<String> index) throws 
IOException {
-    String indexPostfix = ElasticsearchUtils
-        
.getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new 
Date());
-    String sensorType = update.getSensorType();
-    String indexName = getIndexName(update, index, indexPostfix);
-
-    IndexRequest indexRequest = buildIndexRequest(update, sensorType, 
indexName);
-    try {
-      IndexResponse response = client.getHighLevelClient().index(indexRequest);
-
-      ShardInfo shardInfo = response.getShardInfo();
-      int failed = shardInfo.getFailed();
-      if (failed > 0) {
-        throw new IOException(
-            "ElasticsearchDao index failed: " + 
Arrays.toString(shardInfo.getFailures()));
-      }
-    } catch (Exception e) {
-      throw new IOException(e.getMessage(), e);
-    }
-    return update;
+    Map<Document, Optional<String>> updates = new HashMap<>();
+    updates.put(update, index);
+
+    Map<Document, Optional<String>> results = batchUpdate(updates);
+    return results.keySet().iterator().next();
   }
 
   @Override
   public Map<Document, Optional<String>> batchUpdate(Map<Document, 
Optional<String>> updates) throws IOException {
-    String indexPostfix = ElasticsearchUtils
-        
.getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new 
Date());
-
-    BulkRequest bulkRequestBuilder = new BulkRequest();
-    bulkRequestBuilder.setRefreshPolicy(refreshPolicy);
-
-    // Get the indices we'll actually be using for each Document.
-    for (Map.Entry<Document, Optional<String>> updateEntry : 
updates.entrySet()) {
-      Document update = updateEntry.getKey();
-      String sensorType = update.getSensorType();
-      String indexName = getIndexName(update, updateEntry.getValue(), 
indexPostfix);
-      IndexRequest indexRequest = buildIndexRequest(
-          update,
-          sensorType,
-          indexName
-      );
-
-      bulkRequestBuilder.add(indexRequest);
+    Map<String, Object> globalConfig = 
accessConfig.getGlobalConfigSupplier().get();
+    String indexPostfix = 
ElasticsearchUtils.getIndexFormat(globalConfig).format(new Date());
+
+    for (Map.Entry<Document, Optional<String>> entry : updates.entrySet()) {
+      Document document = entry.getKey();
+      Optional<String> optionalIndex = entry.getValue();
+      String indexName = optionalIndex.orElse(getIndexName(document, 
indexPostfix));
+      documentWriter.addDocument(document, indexName);
     }
 
-    BulkResponse bulkResponse = 
client.getHighLevelClient().bulk(bulkRequestBuilder);
-    if (bulkResponse.hasFailures()) {
-      LOG.error("Bulk Request has failures: {}", 
bulkResponse.buildFailureMessage());
-      throw new IOException(
-          "ElasticsearchDao upsert failed: " + 
bulkResponse.buildFailureMessage());
+    // write the documents. if any document fails, raise an exception.
+    BulkDocumentWriterResults<Document> results = documentWriter.write();
+    int failures = results.getFailures().size();
+    if(failures > 0) {
+      int successes = results.getSuccesses().size();
+      String msg = format("Failed to update all documents; %d successes, %d 
failures", successes, failures);
+      LOG.error(msg);
+
+      // log each individual failure
+      for(WriteFailure<Document> failure: results.getFailures()) {
+        LOG.error(failure.getMessage(), failure.getCause());
+      }
+
+      // raise an exception using the first exception as the root cause, 
although there may be many
+      Throwable cause = results.getFailures().get(0).getCause();
+      throw new IOException(msg, cause);
     }
+
     return updates;
   }
 
@@ -187,32 +174,19 @@ public Document 
removeCommentFromAlert(CommentAddRemoveRequest request, Document
   }
 
   public ElasticsearchUpdateDao withRefreshPolicy(WriteRequest.RefreshPolicy 
refreshPolicy) {
-    this.refreshPolicy = refreshPolicy;
+    documentWriter.withRefreshPolicy(refreshPolicy);
     return this;
   }
 
-  protected String getIndexName(Document update, Optional<String> index, 
String indexPostFix) throws IOException {
-    return index.orElse(getIndexName(update.getGuid(), update.getSensorType())
-        .orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), 
indexPostFix, null))
-    );
+  protected String getIndexName(Document update, String indexPostFix) throws 
IOException {
+    return findIndexNameByGUID(update.getGuid(), update.getSensorType())
+            .orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), 
indexPostFix, null));
   }
 
-  protected Optional<String> getIndexName(String guid, String sensorType) 
throws IOException {
-    return retrieveLatestDao.searchByGuid(guid,
-        sensorType,
-        hit -> Optional.ofNullable(hit.getIndex())
-    );
-  }
-
-  protected IndexRequest buildIndexRequest(Document update, String sensorType, 
String indexName) {
-    String type = sensorType + "_doc";
-    Object ts = update.getTimestamp();
-    IndexRequest indexRequest = new IndexRequest(indexName, type, 
update.getGuid())
-        .source(update.getDocument());
-    if (ts != null) {
-      indexRequest = indexRequest.timestamp(ts.toString());
-    }
-
-    return indexRequest;
+  protected Optional<String> findIndexNameByGUID(String guid, String 
sensorType) throws IOException {
+    return retrieveLatestDao.searchByGuid(
+            guid,
+            sensorType,
+            hit -> Optional.ofNullable(hit.getIndex()));
   }
 }
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index fbdd4fecb2..a3459d8ccd 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -17,21 +17,24 @@
  */
 package org.apache.metron.elasticsearch.writer;
 
+import com.google.common.collect.Lists;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.field.FieldNameConverter;
 import org.apache.metron.common.field.FieldNameConverters;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.elasticsearch.bulk.BulkDocumentWriter;
+import org.apache.metron.elasticsearch.bulk.ElasticsearchBulkDocumentWriter;
+import org.apache.metron.elasticsearch.bulk.WriteFailure;
+import org.apache.metron.elasticsearch.bulk.WriteSuccess;
+import org.apache.metron.elasticsearch.bulk.BulkDocumentWriterResults;
 import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,11 +42,14 @@
 import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Date;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import static java.lang.String.format;
+import static org.apache.metron.stellar.common.Constants.Fields.TIMESTAMP;
+
 /**
  * A {@link BulkMessageWriter} that writes messages to Elasticsearch.
  */
@@ -56,90 +62,111 @@
    */
   private transient ElasticsearchClient client;
 
+  /**
+   * Responsible for writing documents.
+   *
+   * <p>Uses a {@link TupleBasedDocument} to maintain the relationship between
+   * a {@link Tuple} and the document created from the contents of that tuple. 
If
+   * a document cannot be written, the associated tuple needs to be failed.
+   */
+  private transient BulkDocumentWriter<TupleBasedDocument> documentWriter;
+
   /**
    * A simple data formatter used to build the appropriate Elasticsearch index 
name.
    */
   private SimpleDateFormat dateFormat;
 
-
   @Override
   public void init(Map stormConf, TopologyContext topologyContext, 
WriterConfiguration configurations) {
-
     Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
-    client = ElasticsearchClientFactory.create(globalConfiguration);
     dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration);
+
+    // only create the document writer, if one does not already exist. useful 
for testing.
+    if(documentWriter == null) {
+      client = ElasticsearchClientFactory.create(globalConfiguration);
+      documentWriter = new ElasticsearchBulkDocumentWriter<>(client);
+    }
   }
 
   @Override
-  public BulkWriterResponse write(String sensorType, WriterConfiguration 
configurations, Iterable<Tuple> tuples, List<JSONObject> messages) throws 
Exception {
+  public BulkWriterResponse write(String sensorType,
+                                  WriterConfiguration configurations,
+                                  Iterable<Tuple> tuplesIter,
+                                  List<JSONObject> messages) {
 
     // fetch the field name converter for this sensor type
     FieldNameConverter fieldNameConverter = 
FieldNameConverters.create(sensorType, configurations);
+    String indexPostfix = dateFormat.format(new Date());
+    String indexName = ElasticsearchUtils.getIndexName(sensorType, 
indexPostfix, configurations);
+
+    // the number of tuples must match the number of messages
+    List<Tuple> tuples = Lists.newArrayList(tuplesIter);
+    int batchSize = tuples.size();
+    if(messages.size() != batchSize) {
+      throw new IllegalStateException(format("Expect same number of tuples and 
messages; |tuples|=%d, |messages|=%d",
+              tuples.size(), messages.size()));
+    }
 
-    final String indexPostfix = dateFormat.format(new Date());
-    BulkRequest bulkRequest = new BulkRequest();
-    for(JSONObject message: messages) {
-
-      JSONObject esDoc = new JSONObject();
-      for(Object k : message.keySet()){
-        copyField(k.toString(), message, esDoc, fieldNameConverter);
-      }
-
-      String indexName = ElasticsearchUtils.getIndexName(sensorType, 
indexPostfix, configurations);
-      IndexRequest indexRequest = new IndexRequest(indexName, sensorType + 
"_doc");
-      indexRequest.source(esDoc.toJSONString());
-      String guid = (String)esDoc.get(Constants.GUID);
-      if(guid != null) {
-        indexRequest.id(guid);
-      }
-
-      Object ts = esDoc.get("timestamp");
-      if(ts != null) {
-        indexRequest.timestamp(ts.toString());
-      }
-      bulkRequest.add(indexRequest);
+    // create a document from each message
+    for(int i=0; i<tuples.size(); i++) {
+      JSONObject message = messages.get(i);
+      Tuple tuple = tuples.get(i);
+      TupleBasedDocument document = createDocument(message, tuple, sensorType, 
fieldNameConverter);
+      documentWriter.addDocument(document, indexName);
     }
 
-    BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequest);
-    return buildWriteReponse(tuples, bulkResponse);
-  }
+    // write the documents
+    BulkDocumentWriterResults<TupleBasedDocument> results = 
documentWriter.write();
 
-  @Override
-  public String getName() {
-    return "elasticsearch";
+    // build the response
+    BulkWriterResponse response = new BulkWriterResponse();
+    for(WriteSuccess<TupleBasedDocument> success: results.getSuccesses()) {
+      response.addSuccess(success.getDocument().getTuple());
+    }
+    for(WriteFailure<TupleBasedDocument> failure: results.getFailures()) {
+      response.addError(failure.getCause(), failure.getDocument().getTuple());
+    }
+    return response;
   }
 
-  protected BulkWriterResponse buildWriteReponse(Iterable<Tuple> tuples, 
BulkResponse bulkResponse) throws Exception {
-    // Elasticsearch responses are in the same order as the request, giving us 
an implicit mapping with Tuples
-    BulkWriterResponse writerResponse = new BulkWriterResponse();
-    if (bulkResponse.hasFailures()) {
-      Iterator<BulkItemResponse> respIter = bulkResponse.iterator();
-      Iterator<Tuple> tupleIter = tuples.iterator();
-      while (respIter.hasNext() && tupleIter.hasNext()) {
-        BulkItemResponse item = respIter.next();
-        Tuple tuple = tupleIter.next();
-
-        if (item.isFailed()) {
-          writerResponse.addError(item.getFailure().getCause(), tuple);
-        } else {
-          writerResponse.addSuccess(tuple);
-        }
-
-        // Should never happen, so fail the entire batch if it does.
-        if (respIter.hasNext() != tupleIter.hasNext()) {
-          throw new Exception(bulkResponse.buildFailureMessage());
-        }
-      }
+  private TupleBasedDocument createDocument(JSONObject message,
+                                            Tuple tuple,
+                                            String sensorType,
+                                            FieldNameConverter 
fieldNameConverter) {
+    // transform the message fields to the source fields of the indexed 
document
+    JSONObject source = new JSONObject();
+    for(Object k : message.keySet()){
+      copyField(k.toString(), message, source, fieldNameConverter);
+    }
+
+    // define the document id
+    String guid = ConversionUtils.convert(source.get(Constants.GUID), 
String.class);
+    if(guid == null) {
+      LOG.warn("Missing '{}' field; document ID will be auto-generated.", 
Constants.GUID);
+    }
+
+    // define the document timestamp
+    Long timestamp = null;
+    Object value = source.get(TIMESTAMP.getName());
+    if(value != null) {
+      timestamp = Long.parseLong(value.toString());
     } else {
-      writerResponse.addAllSuccesses(tuples);
+      LOG.warn("Missing '{}' field; timestamp will be set to system time.", 
TIMESTAMP.getName());
     }
 
-    return writerResponse;
+    return new TupleBasedDocument(source, guid, sensorType, timestamp, tuple);
+  }
+
+  @Override
+  public String getName() {
+    return "elasticsearch";
   }
 
   @Override
   public void close() throws Exception {
-    client.close();
+    if(client != null) {
+      client.close();
+    }
   }
 
   /**
@@ -167,5 +194,13 @@ private void copyField(
     // copy the field
     destination.put(destinationFieldName, source.get(sourceFieldName));
   }
+
+  /**
+   * Set the document writer.  Primarily used for testing.
+   * @param documentWriter The {@link BulkDocumentWriter} to use.
+   */
+  public void setDocumentWriter(BulkDocumentWriter<TupleBasedDocument> 
documentWriter) {
+    this.documentWriter = documentWriter;
+  }
 }
 
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/TupleBasedDocument.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/TupleBasedDocument.java
new file mode 100644
index 0000000000..ba44937b16
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/TupleBasedDocument.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.writer;
+
+import org.apache.metron.indexing.dao.update.Document;
+import org.apache.storm.tuple.Tuple;
+
+import java.util.Map;
+
+/**
+ * An {@link Document} that is created from the contents of a {@link Tuple}.
+ */
+public class TupleBasedDocument extends Document {
+
+    private Tuple tuple;
+
+    public TupleBasedDocument(Map<String, Object> document,
+                              String guid,
+                              String sensorType,
+                              Long timestamp,
+                              Tuple tuple) {
+        super(document, guid, sensorType, timestamp);
+        this.tuple = tuple;
+    }
+
+    public Tuple getTuple() {
+        return tuple;
+    }
+}
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java
new file mode 100644
index 0000000000..b313811fcf
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchBulkDocumentWriterTest.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.bulk;
+
+import org.apache.metron.common.Constants;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ElasticsearchBulkDocumentWriterTest {
+
+    ElasticsearchBulkDocumentWriter<Document> writer;
+    ElasticsearchClient client;
+    RestHighLevelClient highLevelClient;
+
+    @Before
+    public void setup() {
+        // mock Elasticsearch
+        highLevelClient = mock(RestHighLevelClient.class);
+        client = mock(ElasticsearchClient.class);
+        when(client.getHighLevelClient()).thenReturn(highLevelClient);
+
+        writer = new ElasticsearchBulkDocumentWriter<>(client);
+    }
+
+    @Test
+    public void testWriteSuccess() throws IOException {
+        setupElasticsearchToSucceed();
+
+        // write a document successfully
+        Document doc = document(message());
+        String index = "bro_index";
+        writer.addDocument(doc, index);
+
+        BulkDocumentWriterResults<Document> results = writer.write();
+        assertEquals(1, results.getSuccesses().size());
+        assertEquals(0, results.getFailures().size());
+
+        WriteSuccess<Document> success = results.getSuccesses().get(0);
+        assertEquals(doc, success.getDocument());
+    }
+
+    @Test
+    public void testWriteFailure() throws IOException {
+        setupElasticsearchToFail();
+
+        // the document will fail to write
+        Document doc = document(message());
+        String index = "bro_index";
+        writer.addDocument(doc, index);
+
+        BulkDocumentWriterResults<Document> results = writer.write();
+        assertEquals(0, results.getSuccesses().size());
+        assertEquals(1, results.getFailures().size());
+
+        WriteFailure<Document> failure = results.getFailures().get(0);
+        assertEquals(doc, failure.getDocument());
+        assertEquals("error message", failure.getMessage());
+        assertNotNull(failure.getCause());
+    }
+
+    @Test
+    public void testSizeWhenWriteSuccessful() throws IOException {
+        setupElasticsearchToSucceed();
+        assertEquals(0, writer.size());
+
+        // add some documents to write
+        String index = "bro_index";
+        writer.addDocument(document(message()), index);
+        writer.addDocument(document(message()), index);
+        writer.addDocument(document(message()), index);
+        writer.addDocument(document(message()), index);
+        writer.addDocument(document(message()), index);
+        assertEquals(5, writer.size());
+
+        // after the write, all documents should have been flushed
+        writer.write();
+        assertEquals(0, writer.size());
+    }
+
+    @Test
+    public void testSizeWhenWriteFails() throws IOException {
+        setupElasticsearchToFail();
+        assertEquals(0, writer.size());
+
+        // add some documents to write
+        String index = "bro_index";
+        writer.addDocument(document(message()), index);
+        writer.addDocument(document(message()), index);
+        writer.addDocument(document(message()), index);
+        writer.addDocument(document(message()), index);
+        writer.addDocument(document(message()), index);
+        assertEquals(5, writer.size());
+
+        // after the write, all documents should have been flushed
+        writer.write();
+        assertEquals(0, writer.size());
+    }
+
+    private void setupElasticsearchToFail() throws IOException {
+        // define the item failure
+        BulkItemResponse.Failure failure = 
mock(BulkItemResponse.Failure.class);
+        when(failure.getCause()).thenReturn(new Exception("test exception"));
+        when(failure.getMessage()).thenReturn("error message");
+
+        // define the item level response
+        BulkItemResponse itemResponse = mock(BulkItemResponse.class);
+        when(itemResponse.isFailed()).thenReturn(true);
+        when(itemResponse.getItemId()).thenReturn(0);
+        when(itemResponse.getFailure()).thenReturn(failure);
+        when(itemResponse.getFailureMessage()).thenReturn("error message");
+        List<BulkItemResponse> itemsResponses = 
Collections.singletonList(itemResponse);
+
+        // define the bulk response to indicate failure
+        BulkResponse response = mock(BulkResponse.class);
+        when(response.iterator()).thenReturn(itemsResponses.iterator());
+        when(response.hasFailures()).thenReturn(true);
+
+        // have the client return the mock response
+        
when(highLevelClient.bulk(any(BulkRequest.class))).thenReturn(response);
+    }
+
+    private void setupElasticsearchToSucceed() throws IOException {
+        // define the bulk response to indicate success
+        BulkResponse response = mock(BulkResponse.class);
+        when(response.hasFailures()).thenReturn(false);
+
+        // have the client return the mock response
+        
when(highLevelClient.bulk(any(BulkRequest.class))).thenReturn(response);
+    }
+
+    private Document document(JSONObject message) {
+        String guid = UUID.randomUUID().toString();
+        String sensorType = "bro";
+        Long timestamp = System.currentTimeMillis();
+        return new Document(message, guid, sensorType, timestamp);
+    }
+
+    private JSONObject message() {
+        JSONObject message = new JSONObject();
+        message.put(Constants.GUID, UUID.randomUUID().toString());
+        message.put(Constants.Fields.TIMESTAMP.getName(), 
System.currentTimeMillis());
+        message.put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
+        return message;
+    }
+}
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
index 227f5ef73e..dfdf88e495 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
@@ -17,40 +17,11 @@
  */
 package org.apache.metron.elasticsearch.integration.components;
 
-import static java.util.Arrays.asList;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BooleanSupplier;
-
-import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.io.FileUtils;
-import org.apache.metron.common.Constants;
-import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.elasticsearch.client.ElasticsearchClient;
-import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
-import org.apache.metron.elasticsearch.dao.ElasticsearchColumnMetadataDao;
 import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
-import org.apache.metron.elasticsearch.dao.ElasticsearchRequestSubmitter;
-import org.apache.metron.elasticsearch.dao.ElasticsearchRetrieveLatestDao;
-import org.apache.metron.elasticsearch.dao.ElasticsearchSearchDao;
-import org.apache.metron.elasticsearch.dao.ElasticsearchUpdateDao;
-import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
-import org.apache.metron.indexing.dao.search.InvalidSearchException;
-import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.update.Document;
-import org.apache.metron.indexing.dao.update.UpdateDao;
 import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.integration.UnableToStartException;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
@@ -62,9 +33,6 @@
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.client.Client;
@@ -76,11 +44,23 @@
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.transport.Netty4Plugin;
-import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.json.simple.parser.ParseException;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+
 public class ElasticSearchComponent implements InMemoryComponent {
 
   private static class Mapping {
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
index 6a3638b93c..e5e85b090f 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
@@ -18,170 +18,290 @@
 
 package org.apache.metron.elasticsearch.writer;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.elasticsearch.bulk.BulkDocumentWriter;
+import org.apache.metron.elasticsearch.bulk.BulkDocumentWriterResults;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.json.simple.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
 
-import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import org.apache.metron.common.writer.BulkWriterResponse;
-import org.apache.storm.tuple.Tuple;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.junit.Test;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class ElasticsearchWriterTest {
-    @Test
-    public void testSingleSuccesses() throws Exception {
-        Tuple tuple1 = mock(Tuple.class);
 
-        BulkResponse response = mock(BulkResponse.class);
-        when(response.hasFailures()).thenReturn(false);
+    Map stormConf;
+    TopologyContext topologyContext;
+    WriterConfiguration writerConfiguration;
 
-        BulkWriterResponse expected = new BulkWriterResponse();
-        expected.addSuccess(tuple1);
+    @Before
+    public void setup() {
+        topologyContext = mock(TopologyContext.class);
 
-        ElasticsearchWriter esWriter = new ElasticsearchWriter();
-        BulkWriterResponse actual = 
esWriter.buildWriteReponse(ImmutableList.of(tuple1), response);
+        writerConfiguration = mock(WriterConfiguration.class);
+        when(writerConfiguration.getGlobalConfig()).thenReturn(globals());
 
-        assertEquals("Response should have no errors and single success", 
expected, actual);
+        stormConf = new HashMap();
     }
 
     @Test
-    public void testMultipleSuccesses() throws Exception {
-        Tuple tuple1 = mock(Tuple.class);
-        Tuple tuple2 = mock(Tuple.class);
-
-        BulkResponse response = mock(BulkResponse.class);
-        when(response.hasFailures()).thenReturn(false);
-
-        BulkWriterResponse expected = new BulkWriterResponse();
-        expected.addSuccess(tuple1);
-        expected.addSuccess(tuple2);
-
+    public void shouldWriteSuccessfully() {
+        // create a tuple and a message associated with that tuple
+        List<Tuple> tuples = createTuples(1);
+        List<JSONObject> messages = createMessages(1);
+
+        // create a document writer which will successfully write all
+        BulkDocumentWriterResults<TupleBasedDocument> results = new 
BulkDocumentWriterResults<>();
+        results.addSuccess(createDocument(messages.get(0), tuples.get(0)));
+        BulkDocumentWriter<TupleBasedDocument> docWriter = 
mock(BulkDocumentWriter.class);
+        when(docWriter.write()).thenReturn(results);
+
+        // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
-        BulkWriterResponse actual = 
esWriter.buildWriteReponse(ImmutableList.of(tuple1, tuple2), response);
+        esWriter.setDocumentWriter(docWriter);
+        esWriter.init(stormConf, topologyContext, writerConfiguration);
+        BulkWriterResponse response = esWriter.write("bro", 
writerConfiguration, tuples, messages);
 
-        assertEquals("Response should have no errors and two successes", 
expected, actual);
+        // response should only contain successes
+        assertFalse(response.hasErrors());
+        assertTrue(response.getSuccesses().contains(tuples.get(0)));
     }
 
     @Test
-    public void testSingleFailure() throws Exception {
-        Tuple tuple1 = mock(Tuple.class);
-
-        BulkResponse response = mock(BulkResponse.class);
-        when(response.hasFailures()).thenReturn(true);
-
-        Exception e = new IllegalStateException();
-        BulkItemResponse itemResponse = buildBulkItemFailure(e);
-        
when(response.iterator()).thenReturn(ImmutableList.of(itemResponse).iterator());
-
-        BulkWriterResponse expected = new BulkWriterResponse();
-        expected.addError(e, tuple1);
-
+    public void shouldWriteManySuccessfully() {
+        // create a few tuples and the messages associated with the tuples
+        List<Tuple> tuples = createTuples(3);
+        List<JSONObject> messages = createMessages(3);
+
+        // create a document writer which will successfully write all
+        BulkDocumentWriterResults<TupleBasedDocument> results = new 
BulkDocumentWriterResults<>();
+        results.addSuccess(createDocument(messages.get(0), tuples.get(0)));
+        results.addSuccess(createDocument(messages.get(1), tuples.get(1)));
+        results.addSuccess(createDocument(messages.get(2), tuples.get(2)));
+        BulkDocumentWriter<TupleBasedDocument> docWriter = 
mock(BulkDocumentWriter.class);
+        when(docWriter.write()).thenReturn(results);
+
+        // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
-        BulkWriterResponse actual = 
esWriter.buildWriteReponse(ImmutableList.of(tuple1), response);
-
-        assertEquals("Response should have one error and zero successes", 
expected, actual);
+        esWriter.setDocumentWriter(docWriter);
+        esWriter.init(stormConf, topologyContext, writerConfiguration);
+        BulkWriterResponse response = esWriter.write("bro", 
writerConfiguration, tuples, messages);
+
+        // response should only contain successes
+        assertFalse(response.hasErrors());
+        assertTrue(response.getSuccesses().contains(tuples.get(0)));
+        assertTrue(response.getSuccesses().contains(tuples.get(1)));
+        assertTrue(response.getSuccesses().contains(tuples.get(2)));
     }
 
     @Test
-    public void testTwoSameFailure() throws Exception {
-        Tuple tuple1 = mock(Tuple.class);
-        Tuple tuple2 = mock(Tuple.class);
-
-        BulkResponse response = mock(BulkResponse.class);
-        when(response.hasFailures()).thenReturn(true);
-
-        Exception e = new IllegalStateException();
-
-        BulkItemResponse itemResponse = buildBulkItemFailure(e);
-        BulkItemResponse itemResponse2 = buildBulkItemFailure(e);
-
-        when(response.iterator()).thenReturn(ImmutableList.of(itemResponse, 
itemResponse2).iterator());
-
-        BulkWriterResponse expected = new BulkWriterResponse();
-        expected.addError(e, tuple1);
-        expected.addError(e, tuple2);
-
+    public void shouldHandleWriteFailure() {
+        // create a tuple and a message associated with that tuple
+        List<Tuple> tuples = createTuples(1);
+        List<JSONObject> messages = createMessages(1);
+        Exception cause = new Exception();
+
+        // create a document writer which will fail all writes
+        BulkDocumentWriterResults<TupleBasedDocument> results = new 
BulkDocumentWriterResults<>();
+        results.addFailure(createDocument(messages.get(0), tuples.get(0)), 
cause, "error");
+        BulkDocumentWriter<TupleBasedDocument> docWriter = 
mock(BulkDocumentWriter.class);
+        when(docWriter.write()).thenReturn(results);
+
+        // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
-        BulkWriterResponse actual = 
esWriter.buildWriteReponse(ImmutableList.of(tuple1, tuple2), response);
-
-        assertEquals("Response should have two errors and no successes", 
expected, actual);
-
-        // Ensure the errors actually get collapsed together
-        Map<Throwable, Collection<Tuple>> actualErrors = actual.getErrors();
-        HashMap<Throwable, Collection<Tuple>> expectedErrors = new HashMap<>();
-        expectedErrors.put(e, ImmutableList.of(tuple1, tuple2));
-        assertEquals("Errors should have collapsed together", expectedErrors, 
actualErrors);
+        esWriter.setDocumentWriter(docWriter);
+        esWriter.init(stormConf, topologyContext, writerConfiguration);
+        BulkWriterResponse response = esWriter.write("bro", 
writerConfiguration, tuples, messages);
+
+        // the writer response should only contain failures
+        assertEquals(0, response.getSuccesses().size());
+        assertEquals(1, response.getErrors().size());
+        Collection<Tuple> errors = response.getErrors().get(cause);
+        assertTrue(errors.contains(tuples.get(0)));
     }
 
     @Test
-    public void testTwoDifferentFailure() throws Exception {
-        Tuple tuple1 = mock(Tuple.class);
-        Tuple tuple2 = mock(Tuple.class);
+    public void shouldHandleManyWriteFailures() {
+        // create a few tuples and the messages associated with the tuples
+        int count = 3;
+        List<Tuple> tuples = createTuples(count);
+        List<JSONObject> messages = createMessages(count);
+        Exception cause = new Exception();
+
+        // create a document writer which will fail all writes
+        BulkDocumentWriterResults<TupleBasedDocument> results = new 
BulkDocumentWriterResults<>();
+        results.addFailure(createDocument(messages.get(0), tuples.get(0)), 
cause, "error");
+        results.addFailure(createDocument(messages.get(1), tuples.get(1)), 
cause, "error");
+        results.addFailure(createDocument(messages.get(2), tuples.get(2)), 
cause, "error");
+        BulkDocumentWriter<TupleBasedDocument> docWriter = 
mock(BulkDocumentWriter.class);
+        when(docWriter.write()).thenReturn(results);
+
+        // attempt to write
+        ElasticsearchWriter esWriter = new ElasticsearchWriter();
+        esWriter.setDocumentWriter(docWriter);
+        esWriter.init(stormConf, topologyContext, writerConfiguration);
+        BulkWriterResponse response = esWriter.write("bro", 
writerConfiguration, tuples, messages);
+
+        // the writer response should only contain failures
+        assertEquals(0, response.getSuccesses().size());
+        assertEquals(1, response.getErrors().size());
+        Collection<Tuple> errors = response.getErrors().get(cause);
+        assertTrue(errors.contains(tuples.get(0)));
+        assertTrue(errors.contains(tuples.get(1)));
+        assertTrue(errors.contains(tuples.get(2)));
+    }
 
-        BulkResponse response = mock(BulkResponse.class);
-        when(response.hasFailures()).thenReturn(true);
+    @Test
+    public void shouldHandlePartialFailures() {
+        // create a few tuples and the messages associated with the tuples
+        int count = 2;
+        List<Tuple> tuples = createTuples(count);
+        List<JSONObject> messages = createMessages(count);
+        Exception cause = new Exception();
+
+        // create a document writer that will fail one and succeed the other
+        BulkDocumentWriterResults<TupleBasedDocument> results = new 
BulkDocumentWriterResults<>();
+        results.addFailure(createDocument(messages.get(0), tuples.get(0)), 
cause, "error");
+        results.addSuccess(createDocument(messages.get(1), tuples.get(1)));
+        BulkDocumentWriter<TupleBasedDocument> docWriter = 
mock(BulkDocumentWriter.class);
+        when(docWriter.write()).thenReturn(results);
+
+        // attempt to write
+        ElasticsearchWriter esWriter = new ElasticsearchWriter();
+        esWriter.setDocumentWriter(docWriter);
+        esWriter.init(stormConf, topologyContext, writerConfiguration);
+        BulkWriterResponse response = esWriter.write("bro", 
writerConfiguration, tuples, messages);
+
+        // response should contain some successes and some failures
+        assertEquals(1, response.getSuccesses().size());
+        assertEquals(1, response.getErrors().size());
+        assertTrue(response.getErrors().get(cause).contains(tuples.get(0)));
+        assertTrue(response.getSuccesses().contains(tuples.get(1)));
+    }
 
-        Exception e = new IllegalStateException("Cause");
-        Exception e2 = new IllegalStateException("Different Cause");
-        BulkItemResponse itemResponse = buildBulkItemFailure(e);
-        BulkItemResponse itemResponse2 = buildBulkItemFailure(e2);
+    @Test(expected = IllegalStateException.class)
+    public void shouldCheckIfNumberOfMessagesMatchNumberOfTuples() {
+        ElasticsearchWriter esWriter = new ElasticsearchWriter();
+        esWriter.setDocumentWriter(mock(BulkDocumentWriter.class));
+        esWriter.init(stormConf, topologyContext, writerConfiguration);
 
-        when(response.iterator()).thenReturn(ImmutableList.of(itemResponse, 
itemResponse2).iterator());
+        // there are 5 tuples and only 1 message; there should be 5 messages 
to match the number of tuples
+        List<Tuple> tuples = createTuples(5);
+        List<JSONObject> messages = createMessages(1);
 
-        BulkWriterResponse expected = new BulkWriterResponse();
-        expected.addError(e, tuple1);
-        expected.addError(e2, tuple2);
+        esWriter.write("bro", writerConfiguration, tuples, messages);
+        fail("expected exception");
+    }
 
+    @Test
+    public void shouldWriteSuccessfullyWhenMessageTimestampIsString() {
+        List<Tuple> tuples = createTuples(1);
+        List<JSONObject> messages = createMessages(1);
+
+        // the timestamp is a String, rather than a Long
+        messages.get(0).put(Constants.Fields.TIMESTAMP.getName(), new 
Long(System.currentTimeMillis()).toString());
+
+        // create the document
+        JSONObject message = messages.get(0);
+        String timestamp = (String) 
message.get(Constants.Fields.TIMESTAMP.getName());
+        String guid = (String) message.get(Constants.GUID);
+        String sensorType = (String) message.get(Constants.SENSOR_TYPE);
+        TupleBasedDocument document = new TupleBasedDocument(message, guid, 
sensorType, Long.parseLong(timestamp), tuples.get(0));
+
+        // create a document writer which will successfully write that document
+        BulkDocumentWriterResults<TupleBasedDocument> results = new 
BulkDocumentWriterResults<>();
+        results.addSuccess(document);
+        BulkDocumentWriter<TupleBasedDocument> docWriter = 
mock(BulkDocumentWriter.class);
+        when(docWriter.write()).thenReturn(results);
+
+        // attempt to write
         ElasticsearchWriter esWriter = new ElasticsearchWriter();
-        BulkWriterResponse actual = 
esWriter.buildWriteReponse(ImmutableList.of(tuple1, tuple2), response);
+        esWriter.setDocumentWriter(docWriter);
+        esWriter.init(stormConf, topologyContext, writerConfiguration);
+        BulkWriterResponse response = esWriter.write("bro", 
writerConfiguration, tuples, messages);
 
-        assertEquals("Response should have two errors and no successes", 
expected, actual);
-
-        // Ensure the errors did not get collapsed together
-        Map<Throwable, Collection<Tuple>> actualErrors = actual.getErrors();
-        HashMap<Throwable, Collection<Tuple>> expectedErrors = new HashMap<>();
-        expectedErrors.put(e, ImmutableList.of(tuple1));
-        expectedErrors.put(e2, ImmutableList.of(tuple2));
-        assertEquals("Errors should not have collapsed together", 
expectedErrors, actualErrors);
+        // response should only contain successes
+        assertFalse(response.hasErrors());
+        assertTrue(response.getSuccesses().contains(tuples.get(0)));
     }
 
     @Test
-    public void testSuccessAndFailure() throws Exception {
-        Tuple tuple1 = mock(Tuple.class);
-        Tuple tuple2 = mock(Tuple.class);
+    public void shouldWriteSuccessfullyWhenMissingGUID() {
+        // create a tuple and a message associated with that tuple
+        List<Tuple> tuples = createTuples(1);
+        List<JSONObject> messages = createMessages(1);
 
-        BulkResponse response = mock(BulkResponse.class);
-        when(response.hasFailures()).thenReturn(true);
+        // remove the GUID from the message
+        assertNotNull(messages.get(0).remove(Constants.GUID));
 
-        Exception e = new IllegalStateException("Cause");
-        BulkItemResponse itemResponse = buildBulkItemFailure(e);
+        // create a document writer which will successfully write all
+        BulkDocumentWriterResults<TupleBasedDocument> results = new 
BulkDocumentWriterResults<>();
+        results.addSuccess(createDocument(messages.get(0), tuples.get(0)));
+        BulkDocumentWriter<TupleBasedDocument> docWriter = 
mock(BulkDocumentWriter.class);
+        when(docWriter.write()).thenReturn(results);
 
-        BulkItemResponse itemResponse2 = mock(BulkItemResponse.class);
-        when(itemResponse2.isFailed()).thenReturn(false);
+        // attempt to write
+        ElasticsearchWriter esWriter = new ElasticsearchWriter();
+        esWriter.setDocumentWriter(docWriter);
+        esWriter.init(stormConf, topologyContext, writerConfiguration);
+        BulkWriterResponse response = esWriter.write("bro", 
writerConfiguration, tuples, messages);
 
-        when(response.iterator()).thenReturn(ImmutableList.of(itemResponse, 
itemResponse2).iterator());
+        // response should only contain successes
+        assertFalse(response.hasErrors());
+        assertTrue(response.getSuccesses().contains(tuples.get(0)));
+    }
 
-        BulkWriterResponse expected = new BulkWriterResponse();
-        expected.addError(e, tuple1);
-        expected.addSuccess(tuple2);
+    private TupleBasedDocument createDocument(JSONObject message, Tuple tuple) 
{
+        Long timestamp = (Long) 
message.get(Constants.Fields.TIMESTAMP.getName());
+        String guid = (String) message.get(Constants.GUID);
+        String sensorType = (String) message.get(Constants.SENSOR_TYPE);
+        return new TupleBasedDocument(message, guid, sensorType, timestamp, 
tuple);
+    }
 
-        ElasticsearchWriter esWriter = new ElasticsearchWriter();
-        BulkWriterResponse actual = 
esWriter.buildWriteReponse(ImmutableList.of(tuple1, tuple2), response);
+    private JSONObject message() {
+        JSONObject message = new JSONObject();
+        message.put(Constants.GUID, UUID.randomUUID().toString());
+        message.put(Constants.Fields.TIMESTAMP.getName(), 
System.currentTimeMillis());
+        message.put(Constants.Fields.SRC_ADDR.getName(), "192.168.1.1");
+        message.put(Constants.SENSOR_TYPE, "sensor");
+        return message;
+    }
+
+    private Map<String, Object> globals() {
+        Map<String, Object> globals = new HashMap<>();
+        globals.put("es.date.format", "yyyy.MM.dd.HH");
+        return globals;
+    }
 
-        assertEquals("Response should have one error and one success", 
expected, actual);
+    private List<Tuple> createTuples(int count) {
+        List<Tuple> tuples = new ArrayList<>();
+        for(int i=0; i<count; i++) {
+            tuples.add(mock(Tuple.class));
+        }
+        return tuples;
     }
 
-    private BulkItemResponse buildBulkItemFailure(Exception e) {
-        BulkItemResponse itemResponse = mock(BulkItemResponse.class);
-        when(itemResponse.isFailed()).thenReturn(true);
-        BulkItemResponse.Failure failure = 
mock(BulkItemResponse.Failure.class);
-        when(itemResponse.getFailure()).thenReturn(failure);
-        when(failure.getCause()).thenReturn(e);
-        return itemResponse;
+    private List<JSONObject> createMessages(int count) {
+        List<JSONObject> messages = new ArrayList<>();
+        for(int i=0; i<count; i++) {
+            messages.add(message());
+        }
+        return messages;
     }
 }
diff --git 
a/metron-platform/metron-indexing/src/test/resources/log4j.properties 
b/metron-platform/metron-indexing/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..e69de29bb2


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to