This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new c45b841392 NIFI-11111 add option to output Elasticsearch error 
responses as FlowFile to PutElasticsearchJson and PutElasticsearchRecord 
NIFI-11111 clarify error_responses relationships in PutElasticsearchJson/Record 
processors NIFI-11111 Refactor exception handling for error response flowfile 
transfer NIFI-11111 Add elasticsearch.bulk.error attributes containing the 
Elasticsearch _bulk response for error documents in PutElasticsearchJson
c45b841392 is described below

commit c45b841392cc8671fe504374540eb5cca23903ff
Author: Chris Sampson <chris.sampso...@gmail.com>
AuthorDate: Sun Jan 29 19:32:27 2023 +0000

    NIFI-11111 add option to output Elasticsearch error responses as FlowFile 
to PutElasticsearchJson and PutElasticsearchRecord
    NIFI-11111 clarify error_responses relationships in 
PutElasticsearchJson/Record processors
    NIFI-11111 Refactor exception handling for error response flowfile transfer
    NIFI-11111 Add elasticsearch.bulk.error attributes containing the 
Elasticsearch _bulk response for error documents in PutElasticsearchJson
    
    This closes #6903
    
    Signed-off-by: Mike Thomsen <mthom...@apache.org>
---
 .../elasticsearch/AbstractPutElasticsearch.java    | 104 +++++++++++++++++----
 .../elasticsearch/PutElasticsearchJson.java        |  67 +++++++------
 .../elasticsearch/PutElasticsearchRecord.java      |  41 ++++----
 .../AbstractPutElasticsearchTest.groovy            |  50 ++++++++++
 .../elasticsearch/PutElasticsearchJsonTest.groovy  |  51 ++++++++--
 .../PutElasticsearchRecordTest.groovy              |  42 +++++++--
 .../integration/AbstractElasticsearchITBase.java   |   2 +-
 nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml |   2 +-
 8 files changed, 273 insertions(+), 86 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
index af12d18102..0ab0a504f7 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearch.java
@@ -17,7 +17,6 @@
 
 package org.apache.nifi.processors.elasticsearch;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -37,12 +36,17 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 
@@ -67,6 +71,16 @@ public abstract class AbstractPutElasticsearch extends 
AbstractProcessor impleme
         .required(true)
         .build();
 
+    static final PropertyDescriptor OUTPUT_ERROR_RESPONSES = new 
PropertyDescriptor.Builder()
+            .name("put-es-output-error-responses")
+            .displayName("Output Error Responses")
+            .description("If this is enabled, response messages from 
Elasticsearch marked as \"error\" will be output to the \"error_responses\" 
relationship." +
+                    "This does not impact the output of flowfiles to the 
\"success\" or \"errors\" relationships")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+
     static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("All flowfiles that succeed in being transferred into 
Elasticsearch go here. " +
@@ -74,6 +88,12 @@ public abstract class AbstractPutElasticsearch extends 
AbstractProcessor impleme
                     "The Elasticsearch response will need to be examined to 
determine whether any Document(s)/Record(s) resulted in errors.")
             .build();
 
+    static final Relationship REL_ERROR_RESPONSES = new Relationship.Builder()
+            .name("error_responses")
+            .description("Elasticsearch _bulk API responses marked as 
\"error\" go here " +
+                    "(and optionally \"not_found\" when \"Treat \"Not Found\" 
as Error\" is \"true\").")
+            .build();
+
     static final List<String> ALLOWED_INDEX_OPERATIONS = 
Collections.unmodifiableList(Arrays.asList(
             IndexOperationRequest.Operation.Create.getValue().toLowerCase(),
             IndexOperationRequest.Operation.Delete.getValue().toLowerCase(),
@@ -82,12 +102,22 @@ public abstract class AbstractPutElasticsearch extends 
AbstractProcessor impleme
             IndexOperationRequest.Operation.Upsert.getValue().toLowerCase()
     ));
 
+    private final AtomicReference<Set<Relationship>> relationships = new 
AtomicReference<>(getBaseRelationships());
+
     boolean logErrors;
+    boolean outputErrorResponses;
     boolean notFoundIsSuccessful;
     ObjectMapper errorMapper;
 
     final AtomicReference<ElasticSearchClientService> clientService = new 
AtomicReference<>(null);
 
+    abstract Set<Relationship> getBaseRelationships();
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships.get();
+    }
+
     @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
         return new PropertyDescriptor.Builder()
@@ -99,6 +129,17 @@ public abstract class AbstractPutElasticsearch extends 
AbstractProcessor impleme
                 .build();
     }
 
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        if (OUTPUT_ERROR_RESPONSES.equals(descriptor)) {
+            final Set<Relationship> newRelationships = new 
HashSet<>(getBaseRelationships());
+            if (Boolean.parseBoolean(newValue)) {
+                newRelationships.add(REL_ERROR_RESPONSES);
+            }
+            relationships.set(newRelationships);
+        }
+    }
+
     @Override
     public boolean isIndexNotExistSuccessful() {
         // index can be created during _bulk index/create operation
@@ -110,8 +151,9 @@ public abstract class AbstractPutElasticsearch extends 
AbstractProcessor impleme
         
clientService.set(context.getProperty(CLIENT_SERVICE).asControllerService(ElasticSearchClientService.class));
 
         this.logErrors = context.getProperty(LOG_ERROR_RESPONSES).asBoolean();
+        this.outputErrorResponses = 
context.getProperty(OUTPUT_ERROR_RESPONSES).asBoolean();
 
-        if (errorMapper == null && (logErrors || 
getLogger().isDebugEnabled())) {
+        if (errorMapper == null && (outputErrorResponses || logErrors || 
getLogger().isDebugEnabled())) {
             errorMapper = new ObjectMapper();
             errorMapper.enable(SerializationFeature.INDENT_OUTPUT);
         }
@@ -158,15 +200,35 @@ public abstract class AbstractPutElasticsearch extends 
AbstractProcessor impleme
         }
     }
 
-    void logElasticsearchDocumentErrors(final IndexOperationResponse response) 
throws JsonProcessingException {
-        if (logErrors || getLogger().isDebugEnabled()) {
-            final List<Map<String, Object>> errors = response.getItems();
-            final String output = String.format("An error was encountered 
while processing bulk operations. Server response below:%n%n%s", 
errorMapper.writeValueAsString(errors));
+    void handleElasticsearchDocumentErrors(final Map<Integer, Map<String, 
Object>> errors, final ProcessSession session, final FlowFile parent) throws 
IOException {
+        if (!errors.isEmpty() && (outputErrorResponses || logErrors || 
getLogger().isDebugEnabled())) {
+            if (logErrors || getLogger().isDebugEnabled()) {
+                final String output = String.format(
+                        "An error was encountered while processing bulk 
operations. Server response below:%n%n%s",
+                        errorMapper.writeValueAsString(errors.values())
+                );
 
-            if (logErrors) {
-                getLogger().error(output);
-            } else {
-                getLogger().debug(output);
+                if (logErrors) {
+                    getLogger().error(output);
+                } else {
+                    getLogger().debug(output);
+                }
+            }
+
+            if (outputErrorResponses) {
+                FlowFile errorResponsesFF = null;
+                try {
+                    errorResponsesFF = session.create(parent);
+                    try (final OutputStream errorsOutputStream = 
session.write(errorResponsesFF)) {
+                        errorMapper.writeValue(errorsOutputStream, 
errors.values());
+                    }
+                    errorResponsesFF = session.putAttribute(errorResponsesFF, 
"elasticsearch.put.error.count", String.valueOf(errors.size()));
+                    session.transfer(errorResponsesFF, REL_ERROR_RESPONSES);
+                } catch (final IOException ex) {
+                    getLogger().error("Unable to write error responses", ex);
+                    session.remove(errorResponsesFF);
+                    throw ex;
+                }
             }
         }
     }
@@ -179,21 +241,29 @@ public abstract class AbstractPutElasticsearch extends 
AbstractProcessor impleme
         return inner -> inner.containsKey("result") && 
"not_found".equals(inner.get("result"));
     }
 
-    @SafeVarargs
-    final List<Integer> findElasticsearchResponseIndices(final 
IndexOperationResponse response, final Predicate<Map<String, Object>>... 
responseItemFilter) {
-        final List<Integer> indices = new ArrayList<>(response.getItems() == 
null ? 0 : response.getItems().size());
-        if (response.getItems() != null) {
+    final Map<Integer, Map<String, Object>> 
findElasticsearchResponseErrors(final IndexOperationResponse response) {
+        final Map<Integer, Map<String, Object>> errors = new 
LinkedHashMap<>(response.getItems() == null ? 0 : response.getItems().size(), 
1);
+
+        final List<Predicate<Map<String, Object>>> errorItemFilters = new 
ArrayList<>(2);
+        if (response.hasErrors()) {
+            errorItemFilters.add(isElasticsearchError());
+        }
+        if (!notFoundIsSuccessful) {
+            errorItemFilters.add(isElasticsearchNotFound());
+        }
+
+        if (response.getItems() != null && !errorItemFilters.isEmpty()) {
             for (int index = 0; index < response.getItems().size(); index++) {
                 final Map<String, Object> current = 
response.getItems().get(index);
                 if (!current.isEmpty()) {
                     final String key = 
current.keySet().stream().findFirst().orElse(null);
                     @SuppressWarnings("unchecked") final Map<String, Object> 
inner = (Map<String, Object>) current.get(key);
-                    if (inner != null && 
Arrays.stream(responseItemFilter).anyMatch(p -> p.test(inner))) {
-                        indices.add(index);
+                    if (inner != null && errorItemFilters.stream().anyMatch(p 
-> p.test(inner))) {
+                        errors.put(index, inner);
                     }
                 }
             }
         }
-        return indices;
+        return errors;
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
index aac7e00d1f..f17f015813 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchJson.java
@@ -51,14 +51,15 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", 
"elasticsearch7", "elasticsearch8", "put", "index"})
 @CapabilityDescription("An Elasticsearch put processor that uses the official 
Elastic REST client libraries.")
 @WritesAttributes({
-        @WritesAttribute(attribute = "elasticsearch.put.error", description = 
"The error message provided by Elasticsearch if there is an error indexing the 
document.")
+        @WritesAttribute(attribute = "elasticsearch.put.error",
+                description = "The error message if there is an issue parsing 
the FlowFile, sending the parsed document to Elasticsearch or parsing the 
Elasticsearch response"),
+        @WritesAttribute(attribute = "elasticsearch.bulk.error", description = 
"The _bulk response if there was an error during processing the document within 
Elasticsearch.")
 })
 @DynamicProperty(
         name = "The name of a URL query parameter to add",
@@ -101,7 +102,8 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
         .name("put-es-json-error-documents")
         .displayName("Output Error Documents")
         .description("If this configuration property is true, the response 
from Elasticsearch will be examined for failed documents " +
-                "and the FlowFile(s) associated with the failed document(s) 
will be sent to the \"" + REL_FAILED_DOCUMENTS.getName() + "\" relationship.")
+                "and the FlowFile(s) associated with the failed document(s) 
will be sent to the \"" + REL_FAILED_DOCUMENTS.getName() + "\" relationship " +
+                "with \"elasticsearch.bulk.error\" attributes.")
         .allowableValues("true", "false")
         .defaultValue("false")
         .expressionLanguageSupported(ExpressionLanguageScope.NONE)
@@ -110,9 +112,11 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
 
     static final PropertyDescriptor NOT_FOUND_IS_SUCCESSFUL = new 
PropertyDescriptor.Builder()
         .name("put-es-json-not_found-is-error")
-        .displayName("Treat \"Not Found\" as Error")
+        .displayName("Treat \"Not Found\" as Success")
         .description("If true, \"not_found\" Elasticsearch Document associated 
FlowFiles will be routed to the \"" + REL_SUCCESS.getName() +
-                "\" relationship, otherwise to the \"" + 
REL_FAILED_DOCUMENTS.getName() + "\" relationship.")
+                "\" relationship, otherwise to the \"" + 
REL_FAILED_DOCUMENTS.getName() + "\" relationship. " +
+                "If " + OUTPUT_ERROR_RESPONSES.getDisplayName() + " is 
\"true\" then \"not_found\" responses from Elasticsearch " +
+                "will be sent to the " + REL_ERROR_RESPONSES.getName() + " 
relationship")
         .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
         .allowableValues("true", "false")
         .defaultValue("true")
@@ -121,19 +125,19 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
         .build();
 
     static final List<PropertyDescriptor> DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
-        ID_ATTRIBUTE, INDEX_OP, INDEX, TYPE, BATCH_SIZE, CHARSET, 
CLIENT_SERVICE, LOG_ERROR_RESPONSES,
-        OUTPUT_ERROR_DOCUMENTS, NOT_FOUND_IS_SUCCESSFUL
+        ID_ATTRIBUTE, INDEX_OP, INDEX, TYPE, BATCH_SIZE, CHARSET, 
CLIENT_SERVICE,
+        LOG_ERROR_RESPONSES, OUTPUT_ERROR_RESPONSES, OUTPUT_ERROR_DOCUMENTS, 
NOT_FOUND_IS_SUCCESSFUL
     ));
-    static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
-        REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_DOCUMENTS
+    static final Set<Relationship> BASE_RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_DOCUMENTS
     )));
 
     private boolean outputErrors;
-    private final ObjectMapper inputMapper = new ObjectMapper();
+    private final ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
-    public Set<Relationship> getRelationships() {
-        return RELATIONSHIPS;
+    Set<Relationship> getBaseRelationships() {
+        return BASE_RELATIONSHIPS;
     }
 
     @Override
@@ -141,6 +145,7 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
         return DESCRIPTORS;
     }
 
+    @Override
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
         super.onScheduled(context);
@@ -174,7 +179,7 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
             try (final InputStream inStream = session.read(input)) {
                 final byte[] result = IOUtils.toByteArray(inStream);
                 @SuppressWarnings("unchecked")
-                final Map<String, Object> contentMap = 
inputMapper.readValue(new String(result, charset), Map.class);
+                final Map<String, Object> contentMap = 
objectMapper.readValue(new String(result, charset), Map.class);
 
                 final IndexOperationRequest.Operation o = 
IndexOperationRequest.Operation.forValue(indexOp);
                 operations.add(new IndexOperationRequest(index, type, id, 
contentMap, o));
@@ -195,7 +200,7 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
 
         if (!originals.isEmpty()) {
             try {
-                final List<FlowFile> errorDocuments = 
indexDocuments(operations, originals, context);
+                final List<FlowFile> errorDocuments = 
indexDocuments(operations, originals, context, session);
                 session.transfer(errorDocuments, REL_FAILED_DOCUMENTS);
                 errorDocuments.forEach(e ->
                         session.getProvenanceReporter().send(
@@ -239,26 +244,28 @@ public class PutElasticsearchJson extends 
AbstractPutElasticsearch {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private List<FlowFile> indexDocuments(final List<IndexOperationRequest> 
operations, final List<FlowFile> originals, final ProcessContext context) 
throws JsonProcessingException {
+    private List<FlowFile> indexDocuments(final List<IndexOperationRequest> 
operations, final List<FlowFile> originals, final ProcessContext context, final 
ProcessSession session) throws IOException {
         final IndexOperationResponse response = 
clientService.get().bulk(operations, getUrlQueryParameters(context, 
originals.get(0)));
-        final List<FlowFile> errorDocuments = new 
ArrayList<>(response.getItems() == null ? 0 : response.getItems().size());
-
-        List<Predicate<Map<String, Object>>> errorItemFilters = new 
ArrayList<>(2);
-        if (response.hasErrors()) {
-            logElasticsearchDocumentErrors(response);
 
-            if (outputErrors) {
-                errorItemFilters.add(isElasticsearchError());
-            }
+        final Map<Integer, Map<String, Object>> errors = 
findElasticsearchResponseErrors(response);
+        final List<FlowFile> errorDocuments = outputErrors ? new 
ArrayList<>(errors.size()) : Collections.emptyList();
+        if (outputErrors) {
+            errors.forEach((index, error) -> {
+                String errorMessage;
+                try {
+                    errorMessage = objectMapper.writeValueAsString(error);
+                } catch (JsonProcessingException e) {
+                    errorMessage = String.format(
+                            "{\"error\": {\"type\": 
\"elasticsearch_response_parse_error\", \"reason\": \"%s\"}}",
+                            e.getMessage().replace("\"", "\\\"")
+                    );
+                }
+                errorDocuments.add(session.putAttribute(originals.get(index), 
"elasticsearch.bulk.error", errorMessage));
+            });
         }
 
-        if (!notFoundIsSuccessful) {
-            errorItemFilters.add(isElasticsearchNotFound());
-        }
-        if (!errorItemFilters.isEmpty()) {
-            findElasticsearchResponseIndices(response, 
errorItemFilters.toArray(new Predicate[0]))
-                    .forEach(index -> 
errorDocuments.add(originals.get((Integer) index)));
+        if (!errors.isEmpty()) {
+            handleElasticsearchDocumentErrors(errors, session, null);
         }
 
         return errorDocuments;
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
index d70a615915..13fb557756 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecord.java
@@ -76,13 +76,13 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Predicate;
 
 @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
 @Tags({"json", "elasticsearch", "elasticsearch5", "elasticsearch6", 
"elasticsearch7", "elasticsearch8", "put", "index", "record"})
 @CapabilityDescription("A record-aware Elasticsearch put processor that uses 
the official Elastic REST client libraries.")
 @WritesAttributes({
-        @WritesAttribute(attribute = "elasticsearch.put.error", description = 
"The error message provided by Elasticsearch if there is an error indexing the 
documents."),
+        @WritesAttribute(attribute = "elasticsearch.put.error",
+                description = "The error message if there is an issue parsing 
the FlowFile records, sending the parsed documents to Elasticsearch or parsing 
the Elasticsearch response."),
         @WritesAttribute(attribute = "elasticsearch.put.error.count", 
description = "The number of records that generated errors in the Elasticsearch 
_bulk API."),
         @WritesAttribute(attribute = "elasticsearch.put.success.count", 
description = "The number of records that were successfully processed by the 
Elasticsearch _bulk API.")
 })
@@ -207,7 +207,7 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
         .displayName("Result Record Writer")
         .description("If this configuration property is set, the response from 
Elasticsearch will be examined for failed records " +
                 "and the failed records will be written to a record set with 
this record writer service and sent to the \"" +
-                REL_FAILED_RECORDS.getName() + "\" relationship. Successful 
records will be written to a record set" +
+                REL_FAILED_RECORDS.getName() + "\" relationship. Successful 
records will be written to a record set " +
                 "with this record writer service and sent to the \"" + 
REL_SUCCESSFUL_RECORDS.getName() + "\" relationship.")
         .identifiesControllerService(RecordSetWriterFactory.class)
         .addValidator(Validator.VALID)
@@ -216,9 +216,11 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
 
     static final PropertyDescriptor NOT_FOUND_IS_SUCCESSFUL = new 
PropertyDescriptor.Builder()
         .name("put-es-record-not_found-is-error")
-        .displayName("Treat \"Not Found\" as Error")
+        .displayName("Treat \"Not Found\" as Success")
         .description("If true, \"not_found\" Elasticsearch Document associated 
Records will be routed to the \"" +
-                REL_SUCCESSFUL_RECORDS.getName() + "\" relationship, otherwise 
to the \"" + REL_FAILED_RECORDS.getName() + "\" relationship.")
+                REL_SUCCESSFUL_RECORDS.getName() + "\" relationship, otherwise 
to the \"" + REL_FAILED_RECORDS.getName() + "\" relationship. " +
+                "If " + OUTPUT_ERROR_RESPONSES.getDisplayName() + " is 
\"true\" then \"not_found\" responses from Elasticsearch " +
+                "will be sent to the " + REL_ERROR_RESPONSES.getName() + " 
relationship")
         .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
         .allowableValues("true", "false")
         .defaultValue("true")
@@ -266,10 +268,11 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
     static final List<PropertyDescriptor> DESCRIPTORS = 
Collections.unmodifiableList(Arrays.asList(
         INDEX_OP, INDEX, TYPE, AT_TIMESTAMP, CLIENT_SERVICE, RECORD_READER, 
BATCH_SIZE, ID_RECORD_PATH, RETAIN_ID_FIELD,
         INDEX_OP_RECORD_PATH, INDEX_RECORD_PATH, TYPE_RECORD_PATH, 
AT_TIMESTAMP_RECORD_PATH, RETAIN_AT_TIMESTAMP_FIELD,
-        DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES, 
RESULT_RECORD_WRITER, NOT_FOUND_IS_SUCCESSFUL
+        DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, LOG_ERROR_RESPONSES, 
OUTPUT_ERROR_RESPONSES, RESULT_RECORD_WRITER,
+        NOT_FOUND_IS_SUCCESSFUL
     ));
-    static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
-        REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS, 
REL_SUCCESSFUL_RECORDS
+    static final Set<Relationship> BASE_RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
+            REL_SUCCESS, REL_FAILURE, REL_RETRY, REL_FAILED_RECORDS, 
REL_SUCCESSFUL_RECORDS
     )));
 
     private RecordPathCache recordPathCache;
@@ -281,8 +284,8 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
     private volatile String timestampFormat;
 
     @Override
-    public Set<Relationship> getRelationships() {
-        return RELATIONSHIPS;
+    Set<Relationship> getBaseRelationships() {
+        return BASE_RELATIONSHIPS;
     }
 
     @Override
@@ -290,6 +293,7 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
         return DESCRIPTORS;
     }
 
+    @Override
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
         super.onScheduled(context);
@@ -460,19 +464,12 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
     private ResponseDetails indexDocuments(final BulkOperation bundle, final 
ProcessContext context, final ProcessSession session, final FlowFile input) 
throws IOException, SchemaNotFoundException {
         final IndexOperationResponse response = 
clientService.get().bulk(bundle.getOperationList(), 
getUrlQueryParameters(context, input));
 
-        List<Predicate<Map<String, Object>>> errorItemFilters = new 
ArrayList<>(2);
-        if (response.hasErrors()) {
-            logElasticsearchDocumentErrors(response);
-            errorItemFilters.add(isElasticsearchError());
-        }
-
-        if (writerFactory != null && !notFoundIsSuccessful) {
-            errorItemFilters.add(isElasticsearchNotFound());
+        final Map<Integer, Map<String, Object>> errors = 
findElasticsearchResponseErrors(response);
+        if (!errors.isEmpty()) {
+            handleElasticsearchDocumentErrors(errors, session, input);
         }
 
-        @SuppressWarnings("unchecked")
-        final List<Integer> errorIndices = 
findElasticsearchResponseIndices(response, errorItemFilters.toArray(new 
Predicate[0]));
-        final int numErrors = errorIndices.size();
+        final int numErrors = errors.size();
         final int numSuccessful = response.getItems() == null ? 0 : 
response.getItems().size() - numErrors;
         FlowFile errorFF = null;
         FlowFile successFF = null;
@@ -490,7 +487,7 @@ public class PutElasticsearchRecord extends 
AbstractPutElasticsearch {
                     errorWriter.beginRecordSet();
                     successWriter.beginRecordSet();
                     for (int o = 0; o < bundle.getOriginalRecords().size(); 
o++) {
-                        if (errorIndices.contains(o)) {
+                        if (errors.containsKey(o)) {
                             
errorWriter.write(bundle.getOriginalRecords().get(o));
                         } else {
                             
successWriter.write(bundle.getOriginalRecords().get(o));
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearchTest.groovy
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearchTest.groovy
new file mode 100644
index 0000000000..b287d42298
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/AbstractPutElasticsearchTest.groovy
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.elasticsearch
+
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.junit.jupiter.api.Test
+
+import static org.hamcrest.CoreMatchers.hasItem
+import static org.hamcrest.CoreMatchers.not
+import static org.hamcrest.MatcherAssert.assertThat
+
+abstract class AbstractPutElasticsearchTest<P extends 
AbstractPutElasticsearch> {
+    abstract P getProcessor()
+
+    @Test
+    void testOutputErrorResponsesRelationship() {
+        final TestRunner runner = createRunner()
+
+        assertThat(runner.getProcessor().getRelationships(), 
not(hasItem(AbstractPutElasticsearch.REL_ERROR_RESPONSES)))
+
+        runner.setProperty(AbstractPutElasticsearch.OUTPUT_ERROR_RESPONSES, 
"true")
+        assertThat(runner.getProcessor().getRelationships(), 
hasItem(AbstractPutElasticsearch.REL_ERROR_RESPONSES))
+
+        runner.setProperty(AbstractPutElasticsearch.OUTPUT_ERROR_RESPONSES, 
"false")
+        assertThat(runner.getProcessor().getRelationships(), 
not(hasItem(AbstractPutElasticsearch.REL_ERROR_RESPONSES)))
+    }
+
+    TestRunner createRunner() {
+        final P processor = getProcessor()
+        final TestRunner runner = TestRunners.newTestRunner(processor)
+
+        return runner
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy
index 21068df443..e3c371eea8 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchJsonTest.groovy
@@ -22,20 +22,18 @@ import org.apache.nifi.elasticsearch.IndexOperationResponse
 import org.apache.nifi.processors.elasticsearch.mock.MockBulkLoadClientService
 import org.apache.nifi.provenance.ProvenanceEventType
 import org.apache.nifi.util.TestRunner
-import org.apache.nifi.util.TestRunners
 import org.junit.jupiter.api.BeforeEach
 import org.junit.jupiter.api.Test
 
 import static groovy.json.JsonOutput.prettyPrint
 import static groovy.json.JsonOutput.toJson
-
 import static org.hamcrest.CoreMatchers.containsString
 import static org.hamcrest.MatcherAssert.assertThat
 import static org.junit.jupiter.api.Assertions.assertEquals
 import static org.junit.jupiter.api.Assertions.assertThrows
 import static org.junit.jupiter.api.Assertions.assertTrue
 
-class PutElasticsearchJsonTest {
+class PutElasticsearchJsonTest extends 
AbstractPutElasticsearchTest<PutElasticsearchJson> {
     MockBulkLoadClientService clientService
     TestRunner runner
 
@@ -43,10 +41,15 @@ class PutElasticsearchJsonTest {
             [ msg: "Hello, world", from: "john.smith" ]
     ))
 
+    @Override
+    PutElasticsearchJson getProcessor() {
+        return new PutElasticsearchJson()
+    }
+
     @BeforeEach
     void setup() {
         clientService = new MockBulkLoadClientService()
-        runner   = TestRunners.newTestRunner(PutElasticsearchJson.class)
+        runner = createRunner()
 
         clientService.response = new IndexOperationResponse(1500)
 
@@ -60,6 +63,7 @@ class PutElasticsearchJsonTest {
         runner.setProperty(PutElasticsearchJson.LOG_ERROR_RESPONSES, "false")
         runner.setProperty(PutElasticsearchJson.CLIENT_SERVICE, 
"clientService")
         runner.setProperty(PutElasticsearchJson.NOT_FOUND_IS_SUCCESSFUL, 
"true")
+        runner.setProperty(PutElasticsearchJson.OUTPUT_ERROR_RESPONSES, 
"false")
         runner.enableControllerService(clientService)
 
         runner.assertValid()
@@ -98,6 +102,7 @@ class PutElasticsearchJsonTest {
         runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, retry)
         runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, success)
         runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 
0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
 
         assertEquals(success,
                 runner.getProvenanceEvents().stream().filter({
@@ -203,6 +208,7 @@ class PutElasticsearchJsonTest {
         runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 1)
         runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 
0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
     }
 
     @Test
@@ -212,6 +218,7 @@ class PutElasticsearchJsonTest {
         runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
         runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 
0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
     }
 
     @Test
@@ -241,7 +248,13 @@ class PutElasticsearchJsonTest {
         runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
         runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 
1)
-        
assertThat(runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0].getContent(),
 containsString("20abcd"))
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
+
+        def failedDoc = 
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0];
+        assertThat(failedDoc.getContent(), containsString("20abcd"))
+        failedDoc.assertAttributeExists("elasticsearch.bulk.error")
+        failedDoc.assertAttributeNotExists("elasticsearch.put.error")
+        assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"), 
containsString("mapper_parsing_exception"))
         assertEquals(1,
                 runner.getProvenanceEvents().stream().filter({
                     e -> ProvenanceEventType.SEND == e.getEventType() && 
"Elasticsearch _bulk operation error" == e.getDetails()
@@ -253,6 +266,7 @@ class PutElasticsearchJsonTest {
         runner.clearProvenanceEvents()
 
         runner.setProperty(PutElasticsearchJson.NOT_FOUND_IS_SUCCESSFUL, 
"false")
+        runner.setProperty(PutElasticsearchJson.OUTPUT_ERROR_RESPONSES, "true")
 
         for (final def val : values) {
             runner.enqueue(prettyPrint(toJson(val)))
@@ -264,8 +278,24 @@ class PutElasticsearchJsonTest {
         runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
         runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 
2)
-        
assertThat(runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0].getContent(),
 containsString("not_found"))
-        
assertThat(runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[1].getContent(),
 containsString("20abcd"))
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
1)
+
+        failedDoc = 
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[0];
+        assertThat(failedDoc.getContent(), containsString("not_found"))
+        failedDoc.assertAttributeExists("elasticsearch.bulk.error")
+        failedDoc.assertAttributeNotExists("elasticsearch.put.error")
+        assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"), 
containsString("not_found"))
+
+        failedDoc = 
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILED_DOCUMENTS)[1];
+        assertThat(failedDoc.getContent(), containsString("20abcd"))
+        failedDoc.assertAttributeExists("elasticsearch.bulk.error")
+        failedDoc.assertAttributeNotExists("elasticsearch.put.error")
+        assertThat(failedDoc.getAttribute("elasticsearch.bulk.error"), 
containsString("number_format_exception"))
+
+        final String errorResponses = 
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_ERROR_RESPONSES)[0].getContent()
+        assertThat(errorResponses, containsString("not_found"))
+        assertThat(errorResponses, containsString("For input string: 20abc"))
+
         assertEquals(2,
                 runner.getProvenanceEvents().stream().filter({
                     e -> ProvenanceEventType.SEND == e.getEventType() && 
"Elasticsearch _bulk operation error" == e.getDetails()
@@ -285,7 +315,8 @@ class PutElasticsearchJsonTest {
                 [ id: "1", field1: 'value1', field2: '20' ],
                 [ id: "2", field1: 'value1', field2: '20' ],
                 [ id: "2", field1: 'value1', field2: '20' ],
-                [ id: "3", field1: 'value1', field2: '20abcd' ]
+                [ id: "3", field1: 'value1', field2: '20abcd' ],
+                [ id: "4", field1: 'value2', field2: '30' ]
         ]
 
         for (final def val : values) {
@@ -294,10 +325,11 @@ class PutElasticsearchJsonTest {
         runner.assertValid()
         runner.run()
 
-        runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 4)
+        runner.assertTransferCount(PutElasticsearchJson.REL_SUCCESS, 5)
         runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 0)
         runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 
0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
     }
 
     @Test
@@ -309,6 +341,7 @@ class PutElasticsearchJsonTest {
         runner.assertTransferCount(PutElasticsearchJson.REL_FAILURE, 1)
         runner.assertTransferCount(PutElasticsearchJson.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchJson.REL_FAILED_DOCUMENTS, 
0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
 
         
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_FAILURE)[0].assertAttributeEquals(
                 "elasticsearch.put.error",
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
index 5e976a31d3..085844be3a 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-restapi-processors/src/test/groovy/org/apache/nifi/processors/elasticsearch/PutElasticsearchRecordTest.groovy
@@ -32,7 +32,6 @@ import org.apache.nifi.serialization.record.MockSchemaRegistry
 import org.apache.nifi.serialization.record.RecordFieldType
 import org.apache.nifi.util.StringUtils
 import org.apache.nifi.util.TestRunner
-import org.apache.nifi.util.TestRunners
 import org.junit.jupiter.api.BeforeEach
 import org.junit.jupiter.api.Test
 
@@ -46,9 +45,14 @@ import java.time.format.DateTimeFormatter
 
 import static groovy.json.JsonOutput.prettyPrint
 import static groovy.json.JsonOutput.toJson
-import static org.junit.jupiter.api.Assertions.*
-
-class PutElasticsearchRecordTest {
+import static org.hamcrest.CoreMatchers.containsString
+import static org.hamcrest.MatcherAssert.assertThat
+import static org.junit.jupiter.api.Assertions.assertEquals
+import static org.junit.jupiter.api.Assertions.assertNotNull
+import static org.junit.jupiter.api.Assertions.assertThrows
+import static org.junit.jupiter.api.Assertions.assertTrue
+
+class PutElasticsearchRecordTest extends 
AbstractPutElasticsearchTest<PutElasticsearchRecord> {
     private static final int DATE_YEAR = 2020
     private static final int DATE_MONTH = 11
     private static final int DATE_DAY = 27
@@ -81,12 +85,17 @@ class PutElasticsearchRecordTest {
 
     static final String flowFileContents = 
prettyPrint(toJson(flowFileContentMaps))
 
+    @Override
+    PutElasticsearchRecord getProcessor() {
+        return new PutElasticsearchRecord()
+    }
+
     @BeforeEach
     void setup() {
         clientService = new MockBulkLoadClientService()
         registry = new MockSchemaRegistry()
         reader   = new JsonTreeReader()
-        runner   = TestRunners.newTestRunner(PutElasticsearchRecord.class)
+        runner   = createRunner()
 
         registry.addSchema("simple", AvroTypeUtil.createSchema(new 
Schema.Parser().parse(SCHEMA)))
 
@@ -141,6 +150,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESS, success)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
         
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
 
         if (success > 0) {
             
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_SUCCESS).forEach({
 ff ->
@@ -319,6 +329,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
         
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
 
         runner.clearTransferState()
 
@@ -374,6 +385,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
         
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
 
         runner.clearTransferState()
 
@@ -416,6 +428,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
         
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
 
         runner.clearTransferState()
 
@@ -455,6 +468,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
         
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
 
         runner.clearTransferState()
 
@@ -477,6 +491,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
         
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
 
         runner.clearTransferState()
 
@@ -502,6 +517,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
         
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
 
         runner.clearTransferState()
 
@@ -528,6 +544,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
         
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
     }
 
     @Test
@@ -604,6 +621,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
         
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
 
         runner.clearTransferState()
 
@@ -658,6 +676,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
         
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
     }
 
     @Test
@@ -681,6 +700,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
         
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
     }
 
     @Test
@@ -691,6 +711,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
         
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
     }
 
     @Test
@@ -734,6 +755,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
1)
         
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 1)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
 
         
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
                 
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "1")
@@ -760,6 +782,7 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
1)
         
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 1)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
0)
 
         
runner.getFlowFilesForRelationship(PutElasticsearchRecord.REL_FAILED_RECORDS)[0]
                 
.assertAttributeEquals(PutElasticsearchRecord.ATTR_RECORD_COUNT, "2")
@@ -776,7 +799,9 @@ class PutElasticsearchRecordTest {
         runner.clearTransferState()
         runner.clearProvenanceEvents()
 
+        // errors still counted/logged even if not outputting to the error 
relationship
         runner.removeProperty(PutElasticsearchRecord.RESULT_RECORD_WRITER)
+        runner.setProperty(PutElasticsearchRecord.OUTPUT_ERROR_RESPONSES, 
"true")
         runner.enqueue(prettyPrint(toJson(values)), [ 'schema.name': 
'errorTest' ])
         runner.assertValid()
         runner.run()
@@ -786,10 +811,15 @@ class PutElasticsearchRecordTest {
         runner.assertTransferCount(PutElasticsearchRecord.REL_RETRY, 0)
         runner.assertTransferCount(PutElasticsearchRecord.REL_FAILED_RECORDS, 
0)
         
runner.assertTransferCount(PutElasticsearchRecord.REL_SUCCESSFUL_RECORDS, 0)
+        runner.assertTransferCount(PutElasticsearchRecord.REL_ERROR_RESPONSES, 
1)
+
+        final String errorResponses = 
runner.getFlowFilesForRelationship(PutElasticsearchJson.REL_ERROR_RESPONSES)[0].getContent()
+        assertThat(errorResponses, containsString("not_found"))
+        assertThat(errorResponses, containsString("For input string: 20abc"))
 
         assertEquals(1,
                 runner.getProvenanceEvents().stream().filter({
-                    e -> ProvenanceEventType.SEND == e.getEventType() && 
e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [1 error(s), 4 
success(es)]"
+                    e -> ProvenanceEventType.SEND == e.getEventType() && 
e.getDetails() == "1 Elasticsearch _bulk operation batch(es) [2 error(s), 3 
success(es)]"
                 }).count()
         )
     }
diff --git 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
index 7df9de2f07..48610e8261 100644
--- 
a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
+++ 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-test-utils/src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java
@@ -55,7 +55,7 @@ import static org.apache.http.auth.AuthScope.ANY;
 public abstract class AbstractElasticsearchITBase {
     // default Elasticsearch version should (ideally) match that in the 
nifi-elasticsearch-bundle#pom.xml for the integration-tests profile
     protected static final DockerImageName IMAGE = DockerImageName
-            .parse(System.getProperty("elasticsearch.docker.image", 
"docker.elastic.co/elasticsearch/elasticsearch:8.5.0"));
+            .parse(System.getProperty("elasticsearch.docker.image", 
"docker.elastic.co/elasticsearch/elasticsearch:8.7.0"));
     protected static final String ELASTIC_USER_PASSWORD = 
System.getProperty("elasticsearch.elastic_user.password", 
RandomStringUtils.randomAlphanumeric(10, 20));
     protected static final ElasticsearchContainer ELASTICSEARCH_CONTAINER = 
new ElasticsearchContainer(IMAGE)
             .withPassword(ELASTIC_USER_PASSWORD)
diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml 
b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
index e1c2ec6036..c372e11120 100644
--- a/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/pom.xml
@@ -87,7 +87,7 @@ language governing permissions and limitations under the 
License. -->
             </activation>
             <properties>
                 <!-- also update the default Elasticsearch version in 
nifi-elasticsearch-test-utils#src/main/java/org/apache/nifi/elasticsearch/integration/AbstractElasticsearchITBase.java-->
-                <elasticsearch_docker_image>8.5.3</elasticsearch_docker_image>
+                <elasticsearch_docker_image>8.7.0</elasticsearch_docker_image>
                 
<elasticsearch.elastic.password>s3cret</elasticsearch.elastic.password>
             </properties>
             <build>

Reply via email to