lordgamez commented on code in PR #1349:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1349#discussion_r899133277


##########
.github/workflows/ci.yml:
##########
@@ -166,7 +166,7 @@ jobs:
           cmake -DUSE_SHARED_LIBS=ON -DCMAKE_BUILD_TYPE=Release 
-DSTRICT_GSL_CHECKS=AUDIT -DFAIL_ON_WARNINGS=ON -DENABLE_AWS=ON 
-DENABLE_AZURE=ON -DENABLE_BUSTACHE=ON -DENABLE_COAP=ON \
               -DENABLE_ENCRYPT_CONFIG=ON -DENABLE_GPS=ON -DENABLE_JNI=ON 
-DENABLE_LIBRDKAFKA=ON -DENABLE_LINTER=ON -DENABLE_MQTT=ON -DENABLE_NANOFI=ON 
-DENABLE_OPC=ON -DENABLE_OPENCV=ON \
               -DENABLE_OPENWSMAN=ON -DENABLE_OPS=ON -DENABLE_PCAP=ON 
-DENABLE_PYTHON=ON -DENABLE_SENSORS=ON -DENABLE_SFTP=ON -DENABLE_SQL=ON 
-DENABLE_SYSTEMD=ON -DENABLE_TENSORFLOW=OFF \
-              -DENABLE_USB_CAMERA=ON -DENABLE_SCRIPTING=ON 
-DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DENABLE_GCP=ON 
-DENABLE_PROCFS=ON -DCMAKE_EXPORT_COMPILE_COMMANDS=ON ..
+              -DENABLE_USB_CAMERA=ON -DENABLE_SCRIPTING=ON 
-DENABLE_LUA_SCRIPTING=ON -DENABLE_KUBERNETES=ON -DENABLE_GCP=ON 
-DENABLE_PROCFS=ON -DENABLE_ELASTICSEARCH=ON -DCMAKE_EXPORT_COMPILE_COMMANDS=ON 
..

Review Comment:
   I think we should also add it to the GCC build as we usually release the GCC 
builds. Also should this be built with Windows as well in CI?



##########
docker/test/integration/resources/elasticsearch/Dockerfile:
##########
@@ -0,0 +1,7 @@
+FROM elasticsearch:8.2.2
+ADD elasticsearch.yml /usr/share/elasticsearch/config/elasticsearch.yml

Review Comment:
   It's usually recommended to use COPY command instead of ADD: 
https://github.com/hadolint/hadolint/wiki/DL3020
   Same for opensearch dockerfile



##########
extensions/elasticsearch/PostElasticsearch.cpp:
##########
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "PostElasticsearch.h"
+#include <vector>
+#include <utility>
+
+#include "ElasticsearchCredentialsControllerService.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "utils/expected.h"
+#include "utils/JsonCallback.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch {
+
+const core::Relationship PostElasticsearch::Success("success", "All flowfiles 
that succeed in being transferred into Elasticsearch go here.");
+const core::Relationship PostElasticsearch::Failure("failure", "All flowfiles 
that fail for reasons unrelated to server availability go to this 
relationship.");
+const core::Relationship PostElasticsearch::Error("error", "All flowfiles that 
Elasticsearch responded to with an error go to this relationship.");
+
+const core::Property PostElasticsearch::Action = 
core::PropertyBuilder::createProperty("Action")
+    ->withDescription("The type of the operation used to index (create, 
delete, index, update, upsert)")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::MaxBatchSize = 
core::PropertyBuilder::createProperty("Max Batch Size")
+    ->withDescription("The maximum number of Syslog events to process at a 
time.")
+    ->withDefaultValue<uint64_t>(100)
+    ->build();
+
+const core::Property PostElasticsearch::ElasticCredentials = 
core::PropertyBuilder::createProperty("Elasticsearch Credentials Provider 
Service")
+    ->withDescription("The Controller Service used to obtain Elasticsearch 
credentials.")
+    ->isRequired(true)
+    ->asType<ElasticsearchCredentialsControllerService>()
+    ->build();
+
+const core::Property PostElasticsearch::SSLContext = 
core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The SSL Context Service used to provide client 
certificate "
+                      "information for TLS/SSL (https) connections.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()->build();
+
+const core::Property PostElasticsearch::Hosts = 
core::PropertyBuilder::createProperty("Hosts")
+    ->withDescription("A comma-separated list of HTTP hosts that host 
Elasticsearch query nodes. Currently only supports a single host.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Index = 
core::PropertyBuilder::createProperty("Index")
+    ->withDescription("The name of the index to use.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Identifier = 
core::PropertyBuilder::createProperty("Identifier")
+    ->withDescription("If the Action is \"index\" or \"create\", this property 
may be left empty or evaluate to an empty value, "
+                      "in which case the document's identifier will be 
auto-generated by Elasticsearch. "
+                      "For all other Actions, the attribute must evaluate to a 
non-empty value.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+
+void PostElasticsearch::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = context.getProperty(PostElasticsearch::SSLContext))
+    return 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context));
+  return std::shared_ptr<minifi::controllers::SSLContextService>{};
+}
+
+auto getCredentialsService(core::ProcessContext& context) {
+  if (auto credentials = 
context.getProperty(PostElasticsearch::ElasticCredentials))
+    return 
std::dynamic_pointer_cast<ElasticsearchCredentialsControllerService>(context.getControllerService(*credentials));
+  return std::shared_ptr<ElasticsearchCredentialsControllerService>{};
+}
+}  // namespace
+
+void PostElasticsearch::onSchedule(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  context->getProperty(MaxBatchSize.getName(), max_batch_size_);
+  if (max_batch_size_ < 1)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Max Batch Size property is 
invalid");
+
+  std::string host_url{};
+  if (auto hosts_str = context->getProperty(Hosts)) {
+    auto hosts = utils::StringUtils::split(*hosts_str, ",");
+    if (hosts.size() > 1)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multiple hosts not yet 
supported");
+    host_url = hosts[0];
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid hosts");
+  }
+
+  auto credentials_service = getCredentialsService(*context);
+  if (!credentials_service)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing Elasticsearch 
credentials service");
+
+  client_.initialize("POST", host_url + "/_bulk", 
getSSLContextService(*context));
+  client_.setContentType("application/json");
+  credentials_service->authenticateClient(client_);
+}
+
+namespace {
+
+class ElasticPayload {
+ public:
+  [[nodiscard]] std::string toString() const {
+    auto result = headerString();
+    if (payload_) {
+      rapidjson::StringBuffer payload_buffer;
+      rapidjson::Writer<rapidjson::StringBuffer> 
payload_writer(payload_buffer);
+      payload_->Accept(payload_writer);
+      result = result + std::string("\n") + payload_buffer.GetString();
+    }
+    return result;
+  }
+
+  static auto parse(core::ProcessSession& session, core::ProcessContext& 
context, const std::shared_ptr<core::FlowFile>& flow_file) -> 
nonstd::expected<ElasticPayload, std::string> {
+    auto action = context.getProperty(PostElasticsearch::Action, flow_file);
+    if (!action || (action != "index" && action != "create" && action != 
"delete" && action != "update" && action != "upsert"))
+      return nonstd::make_unexpected("Missing or invalid action");
+
+    auto index = context.getProperty(PostElasticsearch::Index, flow_file);
+    if (!index)
+      return nonstd::make_unexpected("Missing index");
+
+    auto id = context.getProperty(PostElasticsearch::Identifier, flow_file);
+    if (!id && (action == "delete" || action == "update" || action == 
"upsert"))
+      return nonstd::make_unexpected("Identifier is required for DELETE,UPDATE 
and UPSERT actions");
+
+    std::optional<rapidjson::Document> payload;
+    if (action == "index" || action == "create") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      utils::JsonInputCallback callback(*payload);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+    }
+    if (action == "update" || action == "upsert") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      rapidjson::Document doc_member(rapidjson::kObjectType, 
&payload->GetAllocator());
+      utils::JsonInputCallback callback(doc_member);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+      if (action == "upsert") {
+        action = "update";
+        doc_member.AddMember("doc_as_upsert", true, doc_member.GetAllocator());
+      }
+      payload->AddMember("doc", doc_member, payload->GetAllocator());
+    }
+    return ElasticPayload(std::move(*action), std::move(*index), 
std::move(id), std::move(payload));
+  }
+
+ private:
+  ElasticPayload(std::string operation,
+                 std::string index,
+                 std::optional<std::string> id,
+                 std::optional<rapidjson::Document> payload) :
+      operation_(std::move(operation)),
+      index_(std::move(index)),
+      id_(std::move(id)),
+      payload_(std::move(payload)) {
+  }
+
+  [[nodiscard]] std::string headerString() const {
+    rapidjson::Document first_line = 
rapidjson::Document(rapidjson::kObjectType);
+
+    auto operation_index_key = rapidjson::Value(operation_.data(), 
operation_.size(), first_line.GetAllocator());
+    first_line.AddMember(operation_index_key, 
rapidjson::Value{rapidjson::kObjectType}, first_line.GetAllocator());
+    auto& operation_request = first_line[operation_.c_str()];
+
+    auto index_json = rapidjson::Value(index_.data(), index_.size(), 
first_line.GetAllocator());
+    operation_request.AddMember("_index", index_json, 
first_line.GetAllocator());
+
+    if (id_) {
+      auto id_json = rapidjson::Value(id_->data(), id_->size(), 
first_line.GetAllocator());
+      operation_request.AddMember("_id", id_json, first_line.GetAllocator());
+    }
+
+    rapidjson::StringBuffer buffer;
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    first_line.Accept(writer);
+
+    return buffer.GetString();
+  }
+
+  std::string operation_;
+  std::string index_;
+  std::optional<std::string> id_;
+  std::optional<rapidjson::Document> payload_;
+};
+
+auto submitRequest(utils::HTTPClient& client, const size_t expected_items) -> 
nonstd::expected<rapidjson::Document, std::string> {
+  if (!client.submit())
+    return nonstd::make_unexpected("Submit failed");
+  auto response_code = client.getResponseCode();
+  if (response_code != 200)
+    return nonstd::make_unexpected("Error occurred: " + 
std::to_string(response_code) + ", " + client.getResponseBody().data());
+  rapidjson::Document response;
+  rapidjson::ParseResult parse_result = 
response.Parse<rapidjson::kParseStopWhenDoneFlag>(client.getResponseBody().data());
+  if (parse_result.IsError())
+    return nonstd::make_unexpected("Response is not valid json");
+  if (!response.HasMember("items"))
+    return nonstd::make_unexpected("Response is invalid");
+  if (response["items"].Size() != expected_items)
+    return nonstd::make_unexpected("The number of responses dont match the 
number of requests");
+
+  return response;
+}
+
+void addAttributesFromResponse(std::string name, 
rapidjson::Value::ConstMemberIterator object, core::FlowFile& flow_file) {
+  name = name + "." + object->name.GetString();
+
+  if (object->value.IsObject()) {
+    for (auto it = object->value.MemberBegin(); it != 
object->value.MemberEnd(); ++it) {
+      addAttributesFromResponse(name, it, flow_file);
+    }
+  } else if (object->value.IsInt64()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetInt64()));
+  } else if (object->value.IsString()) {
+    flow_file.addAttribute(name, object->value.GetString());
+  } else if (object->value.IsBool()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetBool()));
+  }
+}
+}  // namespace
+
+void PostElasticsearch::onTrigger(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context && session && max_batch_size_ > 0);
+  std::stringstream payload;
+  std::vector<std::shared_ptr<core::FlowFile>> flowfiles_in_payload;
+  for (size_t flow_files_processed = 0; flow_files_processed < 
max_batch_size_; ++flow_files_processed) {
+    auto flow_file = session->get();
+    if (!flow_file)
+      break;
+    auto elastic_payload = ElasticPayload::parse(*session, *context, 
flow_file);
+    if (!elastic_payload) {
+      logger_->log_error(elastic_payload.error().c_str());
+      session->transfer(flow_file, Failure);
+      continue;
+    }
+
+    payload << elastic_payload->toString() << "\n";
+    flowfiles_in_payload.push_back(flow_file);
+  }
+
+  if (flowfiles_in_payload.empty()) {
+    yield();
+    return;
+  }
+
+
+  client_.setPostFields(payload.str());
+  auto result = submitRequest(client_, flowfiles_in_payload.size());
+  if (!result) {
+    logger_->log_error(result.error().c_str());
+    for (const auto& flow_file_in_payload: flowfiles_in_payload)
+      session->transfer(flow_file_in_payload, Failure);
+    return;
+  }
+
+  auto& items = result->operator[]("items");

Review Comment:
   Could you use `result["items"]` here make it simpler? Also is "items" always 
available? If so maybe an assertion could be added, or if not a check.



##########
extensions/elasticsearch/PostElasticsearch.cpp:
##########
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "PostElasticsearch.h"
+#include <vector>
+#include <utility>
+
+#include "ElasticsearchCredentialsControllerService.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "utils/expected.h"
+#include "utils/JsonCallback.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch {
+
+const core::Relationship PostElasticsearch::Success("success", "All flowfiles 
that succeed in being transferred into Elasticsearch go here.");
+const core::Relationship PostElasticsearch::Failure("failure", "All flowfiles 
that fail for reasons unrelated to server availability go to this 
relationship.");
+const core::Relationship PostElasticsearch::Error("error", "All flowfiles that 
Elasticsearch responded to with an error go to this relationship.");
+
+const core::Property PostElasticsearch::Action = 
core::PropertyBuilder::createProperty("Action")
+    ->withDescription("The type of the operation used to index (create, 
delete, index, update, upsert)")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::MaxBatchSize = 
core::PropertyBuilder::createProperty("Max Batch Size")
+    ->withDescription("The maximum number of Syslog events to process at a 
time.")
+    ->withDefaultValue<uint64_t>(100)
+    ->build();
+
+const core::Property PostElasticsearch::ElasticCredentials = 
core::PropertyBuilder::createProperty("Elasticsearch Credentials Provider 
Service")
+    ->withDescription("The Controller Service used to obtain Elasticsearch 
credentials.")
+    ->isRequired(true)
+    ->asType<ElasticsearchCredentialsControllerService>()
+    ->build();
+
+const core::Property PostElasticsearch::SSLContext = 
core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The SSL Context Service used to provide client 
certificate "
+                      "information for TLS/SSL (https) connections.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()->build();
+
+const core::Property PostElasticsearch::Hosts = 
core::PropertyBuilder::createProperty("Hosts")
+    ->withDescription("A comma-separated list of HTTP hosts that host 
Elasticsearch query nodes. Currently only supports a single host.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Index = 
core::PropertyBuilder::createProperty("Index")
+    ->withDescription("The name of the index to use.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Identifier = 
core::PropertyBuilder::createProperty("Identifier")
+    ->withDescription("If the Action is \"index\" or \"create\", this property 
may be left empty or evaluate to an empty value, "
+                      "in which case the document's identifier will be 
auto-generated by Elasticsearch. "
+                      "For all other Actions, the attribute must evaluate to a 
non-empty value.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+
+void PostElasticsearch::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = context.getProperty(PostElasticsearch::SSLContext))
+    return 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context));
+  return std::shared_ptr<minifi::controllers::SSLContextService>{};
+}
+
+auto getCredentialsService(core::ProcessContext& context) {
+  if (auto credentials = 
context.getProperty(PostElasticsearch::ElasticCredentials))
+    return 
std::dynamic_pointer_cast<ElasticsearchCredentialsControllerService>(context.getControllerService(*credentials));
+  return std::shared_ptr<ElasticsearchCredentialsControllerService>{};
+}
+}  // namespace
+
+void PostElasticsearch::onSchedule(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  context->getProperty(MaxBatchSize.getName(), max_batch_size_);
+  if (max_batch_size_ < 1)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Max Batch Size property is 
invalid");
+
+  std::string host_url{};
+  if (auto hosts_str = context->getProperty(Hosts)) {
+    auto hosts = utils::StringUtils::split(*hosts_str, ",");
+    if (hosts.size() > 1)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multiple hosts not yet 
supported");
+    host_url = hosts[0];
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid hosts");
+  }
+
+  auto credentials_service = getCredentialsService(*context);
+  if (!credentials_service)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing Elasticsearch 
credentials service");
+
+  client_.initialize("POST", host_url + "/_bulk", 
getSSLContextService(*context));
+  client_.setContentType("application/json");
+  credentials_service->authenticateClient(client_);
+}
+
+namespace {
+
+class ElasticPayload {
+ public:
+  [[nodiscard]] std::string toString() const {
+    auto result = headerString();
+    if (payload_) {
+      rapidjson::StringBuffer payload_buffer;
+      rapidjson::Writer<rapidjson::StringBuffer> 
payload_writer(payload_buffer);
+      payload_->Accept(payload_writer);
+      result = result + std::string("\n") + payload_buffer.GetString();
+    }
+    return result;
+  }
+
+  static auto parse(core::ProcessSession& session, core::ProcessContext& 
context, const std::shared_ptr<core::FlowFile>& flow_file) -> 
nonstd::expected<ElasticPayload, std::string> {
+    auto action = context.getProperty(PostElasticsearch::Action, flow_file);
+    if (!action || (action != "index" && action != "create" && action != 
"delete" && action != "update" && action != "upsert"))
+      return nonstd::make_unexpected("Missing or invalid action");
+
+    auto index = context.getProperty(PostElasticsearch::Index, flow_file);
+    if (!index)
+      return nonstd::make_unexpected("Missing index");
+
+    auto id = context.getProperty(PostElasticsearch::Identifier, flow_file);
+    if (!id && (action == "delete" || action == "update" || action == 
"upsert"))
+      return nonstd::make_unexpected("Identifier is required for DELETE,UPDATE 
and UPSERT actions");
+
+    std::optional<rapidjson::Document> payload;
+    if (action == "index" || action == "create") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      utils::JsonInputCallback callback(*payload);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+    }
+    if (action == "update" || action == "upsert") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      rapidjson::Document doc_member(rapidjson::kObjectType, 
&payload->GetAllocator());
+      utils::JsonInputCallback callback(doc_member);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+      if (action == "upsert") {
+        action = "update";
+        doc_member.AddMember("doc_as_upsert", true, doc_member.GetAllocator());
+      }
+      payload->AddMember("doc", doc_member, payload->GetAllocator());
+    }
+    return ElasticPayload(std::move(*action), std::move(*index), 
std::move(id), std::move(payload));
+  }
+
+ private:
+  ElasticPayload(std::string operation,
+                 std::string index,
+                 std::optional<std::string> id,
+                 std::optional<rapidjson::Document> payload) :
+      operation_(std::move(operation)),
+      index_(std::move(index)),
+      id_(std::move(id)),
+      payload_(std::move(payload)) {
+  }
+
+  [[nodiscard]] std::string headerString() const {
+    rapidjson::Document first_line = 
rapidjson::Document(rapidjson::kObjectType);
+
+    auto operation_index_key = rapidjson::Value(operation_.data(), 
operation_.size(), first_line.GetAllocator());
+    first_line.AddMember(operation_index_key, 
rapidjson::Value{rapidjson::kObjectType}, first_line.GetAllocator());
+    auto& operation_request = first_line[operation_.c_str()];
+
+    auto index_json = rapidjson::Value(index_.data(), index_.size(), 
first_line.GetAllocator());
+    operation_request.AddMember("_index", index_json, 
first_line.GetAllocator());
+
+    if (id_) {
+      auto id_json = rapidjson::Value(id_->data(), id_->size(), 
first_line.GetAllocator());
+      operation_request.AddMember("_id", id_json, first_line.GetAllocator());
+    }
+
+    rapidjson::StringBuffer buffer;
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    first_line.Accept(writer);
+
+    return buffer.GetString();
+  }
+
+  std::string operation_;
+  std::string index_;
+  std::optional<std::string> id_;
+  std::optional<rapidjson::Document> payload_;
+};
+
+auto submitRequest(utils::HTTPClient& client, const size_t expected_items) -> 
nonstd::expected<rapidjson::Document, std::string> {
+  if (!client.submit())
+    return nonstd::make_unexpected("Submit failed");
+  auto response_code = client.getResponseCode();
+  if (response_code != 200)
+    return nonstd::make_unexpected("Error occurred: " + 
std::to_string(response_code) + ", " + client.getResponseBody().data());
+  rapidjson::Document response;
+  rapidjson::ParseResult parse_result = 
response.Parse<rapidjson::kParseStopWhenDoneFlag>(client.getResponseBody().data());
+  if (parse_result.IsError())
+    return nonstd::make_unexpected("Response is not valid json");
+  if (!response.HasMember("items"))
+    return nonstd::make_unexpected("Response is invalid");
+  if (response["items"].Size() != expected_items)
+    return nonstd::make_unexpected("The number of responses dont match the 
number of requests");
+
+  return response;
+}
+
+void addAttributesFromResponse(std::string name, 
rapidjson::Value::ConstMemberIterator object, core::FlowFile& flow_file) {
+  name = name + "." + object->name.GetString();
+
+  if (object->value.IsObject()) {
+    for (auto it = object->value.MemberBegin(); it != 
object->value.MemberEnd(); ++it) {
+      addAttributesFromResponse(name, it, flow_file);
+    }
+  } else if (object->value.IsInt64()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetInt64()));
+  } else if (object->value.IsString()) {
+    flow_file.addAttribute(name, object->value.GetString());
+  } else if (object->value.IsBool()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetBool()));
+  }
+}
+}  // namespace
+
+void PostElasticsearch::onTrigger(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context && session && max_batch_size_ > 0);
+  std::stringstream payload;
+  std::vector<std::shared_ptr<core::FlowFile>> flowfiles_in_payload;

Review Comment:
   Maybe that's just my understanding, but it seems to me 
`flowfiles_with_payload` may be a better name.



##########
docker/test/integration/features/elasticsearch.feature:
##########
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Feature: Managing documents on Elasticsearch with PostElasticsearch
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  @no-ci  # Elasticsearch container requires more RAM than what the CI 
environment has

Review Comment:
   Could this be a Feature level tag instead?



##########
extensions/elasticsearch/ElasticsearchCredentialsControllerService.cpp:
##########
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <utility>
+
+#include "ElasticsearchCredentialsControllerService.h"
+#include "core/Resource.h"
+#include "core/PropertyBuilder.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch {
+const core::Property ElasticsearchCredentialsControllerService::Username = 
core::PropertyBuilder::createProperty("Username")
+    ->withDescription("The username for basic authentication")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ElasticsearchCredentialsControllerService::Password = 
core::PropertyBuilder::createProperty("Password")
+    ->withDescription("The password for basic authentication")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ElasticsearchCredentialsControllerService::ApiKey = 
core::PropertyBuilder::createProperty("API Key")
+    ->withDescription("The API Key to use")
+    ->build();
+
+
+void ElasticsearchCredentialsControllerService::initialize() {
+  setSupportedProperties(properties());
+}
+
+void ElasticsearchCredentialsControllerService::onEnable() {
+  getProperty(ApiKey.getName(), api_key_);
+  std::string username, password;
+  getProperty(Username.getName(), username);
+  getProperty(Password.getName(), password);
+  if (!username.empty() && !password.empty())
+    username_password_.emplace(std::move(username), std::move(password));
+  if (!api_key_ && !username_password_)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Either an API Key or Username 
and Password must be provided");

Review Comment:
   Should we maybe throw an exception if both are set, or it should prioritize 
between API key and basic authentication? In the latter case maybe we should 
note it in the description.



##########
extensions/elasticsearch/PostElasticsearch.cpp:
##########
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "PostElasticsearch.h"
+#include <vector>
+#include <utility>
+
+#include "ElasticsearchCredentialsControllerService.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "utils/expected.h"
+#include "utils/JsonCallback.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch {
+
+const core::Relationship PostElasticsearch::Success("success", "All flowfiles 
that succeed in being transferred into Elasticsearch go here.");
+const core::Relationship PostElasticsearch::Failure("failure", "All flowfiles 
that fail for reasons unrelated to server availability go to this 
relationship.");
+const core::Relationship PostElasticsearch::Error("error", "All flowfiles that 
Elasticsearch responded to with an error go to this relationship.");
+
+const core::Property PostElasticsearch::Action = 
core::PropertyBuilder::createProperty("Action")
+    ->withDescription("The type of the operation used to index (create, 
delete, index, update, upsert)")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::MaxBatchSize = 
core::PropertyBuilder::createProperty("Max Batch Size")
+    ->withDescription("The maximum number of Syslog events to process at a 
time.")
+    ->withDefaultValue<uint64_t>(100)
+    ->build();
+
+const core::Property PostElasticsearch::ElasticCredentials = 
core::PropertyBuilder::createProperty("Elasticsearch Credentials Provider 
Service")
+    ->withDescription("The Controller Service used to obtain Elasticsearch 
credentials.")
+    ->isRequired(true)
+    ->asType<ElasticsearchCredentialsControllerService>()
+    ->build();
+
+const core::Property PostElasticsearch::SSLContext = 
core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The SSL Context Service used to provide client 
certificate "
+                      "information for TLS/SSL (https) connections.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()->build();
+
+const core::Property PostElasticsearch::Hosts = 
core::PropertyBuilder::createProperty("Hosts")
+    ->withDescription("A comma-separated list of HTTP hosts that host 
Elasticsearch query nodes. Currently only supports a single host.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Index = 
core::PropertyBuilder::createProperty("Index")
+    ->withDescription("The name of the index to use.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Identifier = 
core::PropertyBuilder::createProperty("Identifier")
+    ->withDescription("If the Action is \"index\" or \"create\", this property 
may be left empty or evaluate to an empty value, "
+                      "in which case the document's identifier will be 
auto-generated by Elasticsearch. "
+                      "For all other Actions, the attribute must evaluate to a 
non-empty value.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+
+void PostElasticsearch::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = context.getProperty(PostElasticsearch::SSLContext))
+    return 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context));
+  return std::shared_ptr<minifi::controllers::SSLContextService>{};
+}
+
+auto getCredentialsService(core::ProcessContext& context) {
+  if (auto credentials = 
context.getProperty(PostElasticsearch::ElasticCredentials))
+    return 
std::dynamic_pointer_cast<ElasticsearchCredentialsControllerService>(context.getControllerService(*credentials));
+  return std::shared_ptr<ElasticsearchCredentialsControllerService>{};
+}
+}  // namespace
+
+void PostElasticsearch::onSchedule(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  context->getProperty(MaxBatchSize.getName(), max_batch_size_);
+  if (max_batch_size_ < 1)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Max Batch Size property is 
invalid");
+
+  std::string host_url{};
+  if (auto hosts_str = context->getProperty(Hosts)) {
+    auto hosts = utils::StringUtils::split(*hosts_str, ",");
+    if (hosts.size() > 1)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multiple hosts not yet 
supported");
+    host_url = hosts[0];
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid hosts");
+  }
+
+  auto credentials_service = getCredentialsService(*context);
+  if (!credentials_service)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing Elasticsearch 
credentials service");
+
+  client_.initialize("POST", host_url + "/_bulk", 
getSSLContextService(*context));
+  client_.setContentType("application/json");
+  credentials_service->authenticateClient(client_);
+}
+
+namespace {
+
+class ElasticPayload {
+ public:
+  [[nodiscard]] std::string toString() const {
+    auto result = headerString();
+    if (payload_) {
+      rapidjson::StringBuffer payload_buffer;
+      rapidjson::Writer<rapidjson::StringBuffer> 
payload_writer(payload_buffer);
+      payload_->Accept(payload_writer);
+      result = result + std::string("\n") + payload_buffer.GetString();
+    }
+    return result;
+  }
+
+  static auto parse(core::ProcessSession& session, core::ProcessContext& 
context, const std::shared_ptr<core::FlowFile>& flow_file) -> 
nonstd::expected<ElasticPayload, std::string> {
+    auto action = context.getProperty(PostElasticsearch::Action, flow_file);
+    if (!action || (action != "index" && action != "create" && action != 
"delete" && action != "update" && action != "upsert"))
+      return nonstd::make_unexpected("Missing or invalid action");
+
+    auto index = context.getProperty(PostElasticsearch::Index, flow_file);
+    if (!index)
+      return nonstd::make_unexpected("Missing index");
+
+    auto id = context.getProperty(PostElasticsearch::Identifier, flow_file);
+    if (!id && (action == "delete" || action == "update" || action == 
"upsert"))
+      return nonstd::make_unexpected("Identifier is required for DELETE,UPDATE 
and UPSERT actions");
+
+    std::optional<rapidjson::Document> payload;
+    if (action == "index" || action == "create") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      utils::JsonInputCallback callback(*payload);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+    }
+    if (action == "update" || action == "upsert") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      rapidjson::Document doc_member(rapidjson::kObjectType, 
&payload->GetAllocator());
+      utils::JsonInputCallback callback(doc_member);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+      if (action == "upsert") {
+        action = "update";
+        doc_member.AddMember("doc_as_upsert", true, doc_member.GetAllocator());
+      }
+      payload->AddMember("doc", doc_member, payload->GetAllocator());
+    }
+    return ElasticPayload(std::move(*action), std::move(*index), 
std::move(id), std::move(payload));
+  }
+
+ private:
+  ElasticPayload(std::string operation,
+                 std::string index,
+                 std::optional<std::string> id,
+                 std::optional<rapidjson::Document> payload) :
+      operation_(std::move(operation)),
+      index_(std::move(index)),
+      id_(std::move(id)),
+      payload_(std::move(payload)) {
+  }
+
+  [[nodiscard]] std::string headerString() const {
+    rapidjson::Document first_line = 
rapidjson::Document(rapidjson::kObjectType);
+
+    auto operation_index_key = rapidjson::Value(operation_.data(), 
operation_.size(), first_line.GetAllocator());
+    first_line.AddMember(operation_index_key, 
rapidjson::Value{rapidjson::kObjectType}, first_line.GetAllocator());
+    auto& operation_request = first_line[operation_.c_str()];
+
+    auto index_json = rapidjson::Value(index_.data(), index_.size(), 
first_line.GetAllocator());
+    operation_request.AddMember("_index", index_json, 
first_line.GetAllocator());
+
+    if (id_) {
+      auto id_json = rapidjson::Value(id_->data(), id_->size(), 
first_line.GetAllocator());
+      operation_request.AddMember("_id", id_json, first_line.GetAllocator());
+    }
+
+    rapidjson::StringBuffer buffer;
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    first_line.Accept(writer);
+
+    return buffer.GetString();
+  }
+
+  std::string operation_;
+  std::string index_;
+  std::optional<std::string> id_;
+  std::optional<rapidjson::Document> payload_;
+};
+
+auto submitRequest(utils::HTTPClient& client, const size_t expected_items) -> 
nonstd::expected<rapidjson::Document, std::string> {
+  if (!client.submit())
+    return nonstd::make_unexpected("Submit failed");
+  auto response_code = client.getResponseCode();
+  if (response_code != 200)
+    return nonstd::make_unexpected("Error occurred: " + 
std::to_string(response_code) + ", " + client.getResponseBody().data());
+  rapidjson::Document response;
+  rapidjson::ParseResult parse_result = 
response.Parse<rapidjson::kParseStopWhenDoneFlag>(client.getResponseBody().data());
+  if (parse_result.IsError())
+    return nonstd::make_unexpected("Response is not valid json");
+  if (!response.HasMember("items"))
+    return nonstd::make_unexpected("Response is invalid");
+  if (response["items"].Size() != expected_items)
+    return nonstd::make_unexpected("The number of responses dont match the 
number of requests");
+
+  return response;
+}
+
+void addAttributesFromResponse(std::string name, 
rapidjson::Value::ConstMemberIterator object, core::FlowFile& flow_file) {
+  name = name + "." + object->name.GetString();
+
+  if (object->value.IsObject()) {
+    for (auto it = object->value.MemberBegin(); it != 
object->value.MemberEnd(); ++it) {
+      addAttributesFromResponse(name, it, flow_file);
+    }
+  } else if (object->value.IsInt64()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetInt64()));
+  } else if (object->value.IsString()) {
+    flow_file.addAttribute(name, object->value.GetString());
+  } else if (object->value.IsBool()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetBool()));
+  }

Review Comment:
   Should we handle additional types somehow or just add a log message?



##########
docker/test/integration/features/opensearch.feature:
##########
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Feature: PostElasticsearch works on Opensearch (Opensearch doesnt support API 
Keys)
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  @no-ci  # Opensearch container requires more RAM than what the CI 
environment has

Review Comment:
   Could this be a Feature level tag instead?



##########
extensions/elasticsearch/PostElasticsearch.cpp:
##########
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "PostElasticsearch.h"
+#include <vector>
+#include <utility>
+
+#include "ElasticsearchCredentialsControllerService.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "utils/expected.h"
+#include "utils/JsonCallback.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch {
+
+const core::Relationship PostElasticsearch::Success("success", "All flowfiles 
that succeed in being transferred into Elasticsearch go here.");
+const core::Relationship PostElasticsearch::Failure("failure", "All flowfiles 
that fail for reasons unrelated to server availability go to this 
relationship.");
+const core::Relationship PostElasticsearch::Error("error", "All flowfiles that 
Elasticsearch responded to with an error go to this relationship.");
+
+const core::Property PostElasticsearch::Action = 
core::PropertyBuilder::createProperty("Action")
+    ->withDescription("The type of the operation used to index (create, 
delete, index, update, upsert)")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::MaxBatchSize = 
core::PropertyBuilder::createProperty("Max Batch Size")
+    ->withDescription("The maximum number of Syslog events to process at a 
time.")
+    ->withDefaultValue<uint64_t>(100)
+    ->build();
+
+const core::Property PostElasticsearch::ElasticCredentials = 
core::PropertyBuilder::createProperty("Elasticsearch Credentials Provider 
Service")
+    ->withDescription("The Controller Service used to obtain Elasticsearch 
credentials.")
+    ->isRequired(true)
+    ->asType<ElasticsearchCredentialsControllerService>()
+    ->build();
+
+const core::Property PostElasticsearch::SSLContext = 
core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The SSL Context Service used to provide client 
certificate "
+                      "information for TLS/SSL (https) connections.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()->build();
+
+const core::Property PostElasticsearch::Hosts = 
core::PropertyBuilder::createProperty("Hosts")
+    ->withDescription("A comma-separated list of HTTP hosts that host 
Elasticsearch query nodes. Currently only supports a single host.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Index = 
core::PropertyBuilder::createProperty("Index")
+    ->withDescription("The name of the index to use.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Identifier = 
core::PropertyBuilder::createProperty("Identifier")
+    ->withDescription("If the Action is \"index\" or \"create\", this property 
may be left empty or evaluate to an empty value, "
+                      "in which case the document's identifier will be 
auto-generated by Elasticsearch. "
+                      "For all other Actions, the attribute must evaluate to a 
non-empty value.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+
+void PostElasticsearch::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = context.getProperty(PostElasticsearch::SSLContext))
+    return 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context));
+  return std::shared_ptr<minifi::controllers::SSLContextService>{};
+}
+
+auto getCredentialsService(core::ProcessContext& context) {
+  if (auto credentials = 
context.getProperty(PostElasticsearch::ElasticCredentials))
+    return 
std::dynamic_pointer_cast<ElasticsearchCredentialsControllerService>(context.getControllerService(*credentials));
+  return std::shared_ptr<ElasticsearchCredentialsControllerService>{};
+}
+}  // namespace
+
+void PostElasticsearch::onSchedule(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  context->getProperty(MaxBatchSize.getName(), max_batch_size_);
+  if (max_batch_size_ < 1)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Max Batch Size property is 
invalid");
+
+  std::string host_url{};
+  if (auto hosts_str = context->getProperty(Hosts)) {
+    auto hosts = utils::StringUtils::split(*hosts_str, ",");
+    if (hosts.size() > 1)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multiple hosts not yet 
supported");
+    host_url = hosts[0];
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid hosts");
+  }
+
+  auto credentials_service = getCredentialsService(*context);
+  if (!credentials_service)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing Elasticsearch 
credentials service");
+
+  client_.initialize("POST", host_url + "/_bulk", 
getSSLContextService(*context));
+  client_.setContentType("application/json");
+  credentials_service->authenticateClient(client_);
+}
+
+namespace {
+
+class ElasticPayload {
+ public:
+  [[nodiscard]] std::string toString() const {
+    auto result = headerString();
+    if (payload_) {
+      rapidjson::StringBuffer payload_buffer;
+      rapidjson::Writer<rapidjson::StringBuffer> 
payload_writer(payload_buffer);
+      payload_->Accept(payload_writer);
+      result = result + std::string("\n") + payload_buffer.GetString();
+    }
+    return result;
+  }
+
+  static auto parse(core::ProcessSession& session, core::ProcessContext& 
context, const std::shared_ptr<core::FlowFile>& flow_file) -> 
nonstd::expected<ElasticPayload, std::string> {
+    auto action = context.getProperty(PostElasticsearch::Action, flow_file);
+    if (!action || (action != "index" && action != "create" && action != 
"delete" && action != "update" && action != "upsert"))
+      return nonstd::make_unexpected("Missing or invalid action");
+
+    auto index = context.getProperty(PostElasticsearch::Index, flow_file);
+    if (!index)
+      return nonstd::make_unexpected("Missing index");
+
+    auto id = context.getProperty(PostElasticsearch::Identifier, flow_file);
+    if (!id && (action == "delete" || action == "update" || action == 
"upsert"))
+      return nonstd::make_unexpected("Identifier is required for DELETE,UPDATE 
and UPSERT actions");
+
+    std::optional<rapidjson::Document> payload;
+    if (action == "index" || action == "create") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      utils::JsonInputCallback callback(*payload);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+    }
+    if (action == "update" || action == "upsert") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      rapidjson::Document doc_member(rapidjson::kObjectType, 
&payload->GetAllocator());
+      utils::JsonInputCallback callback(doc_member);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+      if (action == "upsert") {
+        action = "update";
+        doc_member.AddMember("doc_as_upsert", true, doc_member.GetAllocator());
+      }
+      payload->AddMember("doc", doc_member, payload->GetAllocator());
+    }
+    return ElasticPayload(std::move(*action), std::move(*index), 
std::move(id), std::move(payload));
+  }
+
+ private:
+  ElasticPayload(std::string operation,
+                 std::string index,
+                 std::optional<std::string> id,
+                 std::optional<rapidjson::Document> payload) :
+      operation_(std::move(operation)),
+      index_(std::move(index)),
+      id_(std::move(id)),
+      payload_(std::move(payload)) {
+  }
+
+  [[nodiscard]] std::string headerString() const {
+    rapidjson::Document first_line = 
rapidjson::Document(rapidjson::kObjectType);
+
+    auto operation_index_key = rapidjson::Value(operation_.data(), 
operation_.size(), first_line.GetAllocator());
+    first_line.AddMember(operation_index_key, 
rapidjson::Value{rapidjson::kObjectType}, first_line.GetAllocator());
+    auto& operation_request = first_line[operation_.c_str()];
+
+    auto index_json = rapidjson::Value(index_.data(), index_.size(), 
first_line.GetAllocator());
+    operation_request.AddMember("_index", index_json, 
first_line.GetAllocator());
+
+    if (id_) {
+      auto id_json = rapidjson::Value(id_->data(), id_->size(), 
first_line.GetAllocator());
+      operation_request.AddMember("_id", id_json, first_line.GetAllocator());
+    }
+
+    rapidjson::StringBuffer buffer;
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    first_line.Accept(writer);
+
+    return buffer.GetString();
+  }
+
+  std::string operation_;
+  std::string index_;
+  std::optional<std::string> id_;
+  std::optional<rapidjson::Document> payload_;
+};
+
+auto submitRequest(utils::HTTPClient& client, const size_t expected_items) -> 
nonstd::expected<rapidjson::Document, std::string> {
+  if (!client.submit())
+    return nonstd::make_unexpected("Submit failed");
+  auto response_code = client.getResponseCode();
+  if (response_code != 200)
+    return nonstd::make_unexpected("Error occurred: " + 
std::to_string(response_code) + ", " + client.getResponseBody().data());
+  rapidjson::Document response;
+  rapidjson::ParseResult parse_result = 
response.Parse<rapidjson::kParseStopWhenDoneFlag>(client.getResponseBody().data());
+  if (parse_result.IsError())
+    return nonstd::make_unexpected("Response is not valid json");
+  if (!response.HasMember("items"))
+    return nonstd::make_unexpected("Response is invalid");
+  if (response["items"].Size() != expected_items)
+    return nonstd::make_unexpected("The number of responses dont match the 
number of requests");
+
+  return response;
+}
+
+void addAttributesFromResponse(std::string name, 
rapidjson::Value::ConstMemberIterator object, core::FlowFile& flow_file) {
+  name = name + "." + object->name.GetString();
+
+  if (object->value.IsObject()) {
+    for (auto it = object->value.MemberBegin(); it != 
object->value.MemberEnd(); ++it) {
+      addAttributesFromResponse(name, it, flow_file);
+    }
+  } else if (object->value.IsInt64()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetInt64()));
+  } else if (object->value.IsString()) {
+    flow_file.addAttribute(name, object->value.GetString());
+  } else if (object->value.IsBool()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetBool()));
+  }
+}
+}  // namespace
+
+void PostElasticsearch::onTrigger(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context && session && max_batch_size_ > 0);
+  std::stringstream payload;
+  std::vector<std::shared_ptr<core::FlowFile>> flowfiles_in_payload;
+  for (size_t flow_files_processed = 0; flow_files_processed < 
max_batch_size_; ++flow_files_processed) {
+    auto flow_file = session->get();
+    if (!flow_file)
+      break;
+    auto elastic_payload = ElasticPayload::parse(*session, *context, 
flow_file);
+    if (!elastic_payload) {
+      logger_->log_error(elastic_payload.error().c_str());
+      session->transfer(flow_file, Failure);
+      continue;
+    }
+
+    payload << elastic_payload->toString() << "\n";
+    flowfiles_in_payload.push_back(flow_file);
+  }
+
+  if (flowfiles_in_payload.empty()) {
+    yield();

Review Comment:
   Should we yield here? Even if the `flowfiles_in_payload` is empty there may 
have been several other flowfiles sent to the Failure relationship, so the 
queue wasn't necessarily empty.



##########
docker/test/integration/features/elasticsearch.feature:
##########
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Feature: Managing documents on Elasticsearch with PostElasticsearch
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  @no-ci  # Elasticsearch container requires more RAM than what the CI 
environment has
+  Scenario: MiNiFi instance indexes a document on Elasticsearch using Basic 
Authentication
+    Given an Elasticsearch server is set up and running
+    And a GetFile processor with the "Input Directory" property set to 
"/tmp/input"
+    And a file with the content "{ "field1" : "value1" }" is present in 
"/tmp/input"
+    And a PostElasticsearch processor
+    And the "Index" property of the PostElasticsearch processor is set to 
"my_index"
+    And the "Identifier" property of the PostElasticsearch processor is set to 
"my_id"
+    And the "Action" property of the PostElasticsearch processor is set to 
"index"
+    And a SSL context service is set up for PostElasticsearch and Elasticsearch
+    And an ElasticsearchCredentialsService is set up for PostElasticsearch 
with Basic Authentication
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to 
the PostElasticsearch
+    And the "success" relationship of the PostElasticsearch processor is 
connected to the PutFile
+
+    When both instances start up
+    Then a flowfile with the content "{ "field1" : "value1" }" is placed in 
the monitored directory in less than 20 seconds
+    And Elasticsearch has a document with "my_id" in "my_index" that has 
"value1" set in "field1"
+
+  @no-ci  # Elasticsearch container requires more RAM than what the CI 
environment has
+  Scenario: MiNiFi instance creates a document on Elasticsearch using Basic 
Authentication

Review Comment:
   This should say "authenticating with API Key"



##########
extensions/elasticsearch/PostElasticsearch.cpp:
##########
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "PostElasticsearch.h"
+#include <vector>
+#include <utility>
+
+#include "ElasticsearchCredentialsControllerService.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "utils/expected.h"
+#include "utils/JsonCallback.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch {
+
+const core::Relationship PostElasticsearch::Success("success", "All flowfiles 
that succeed in being transferred into Elasticsearch go here.");
+const core::Relationship PostElasticsearch::Failure("failure", "All flowfiles 
that fail for reasons unrelated to server availability go to this 
relationship.");
+const core::Relationship PostElasticsearch::Error("error", "All flowfiles that 
Elasticsearch responded to with an error go to this relationship.");
+
+const core::Property PostElasticsearch::Action = 
core::PropertyBuilder::createProperty("Action")
+    ->withDescription("The type of the operation used to index (create, 
delete, index, update, upsert)")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::MaxBatchSize = 
core::PropertyBuilder::createProperty("Max Batch Size")
+    ->withDescription("The maximum number of Syslog events to process at a 
time.")
+    ->withDefaultValue<uint64_t>(100)
+    ->build();
+
+const core::Property PostElasticsearch::ElasticCredentials = 
core::PropertyBuilder::createProperty("Elasticsearch Credentials Provider 
Service")
+    ->withDescription("The Controller Service used to obtain Elasticsearch 
credentials.")
+    ->isRequired(true)
+    ->asType<ElasticsearchCredentialsControllerService>()
+    ->build();
+
+const core::Property PostElasticsearch::SSLContext = 
core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The SSL Context Service used to provide client 
certificate "
+                      "information for TLS/SSL (https) connections.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()->build();
+
+const core::Property PostElasticsearch::Hosts = 
core::PropertyBuilder::createProperty("Hosts")
+    ->withDescription("A comma-separated list of HTTP hosts that host 
Elasticsearch query nodes. Currently only supports a single host.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Index = 
core::PropertyBuilder::createProperty("Index")
+    ->withDescription("The name of the index to use.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Identifier = 
core::PropertyBuilder::createProperty("Identifier")
+    ->withDescription("If the Action is \"index\" or \"create\", this property 
may be left empty or evaluate to an empty value, "
+                      "in which case the document's identifier will be 
auto-generated by Elasticsearch. "
+                      "For all other Actions, the attribute must evaluate to a 
non-empty value.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+
+void PostElasticsearch::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = context.getProperty(PostElasticsearch::SSLContext))
+    return 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context));
+  return std::shared_ptr<minifi::controllers::SSLContextService>{};
+}
+
+auto getCredentialsService(core::ProcessContext& context) {
+  if (auto credentials = 
context.getProperty(PostElasticsearch::ElasticCredentials))
+    return 
std::dynamic_pointer_cast<ElasticsearchCredentialsControllerService>(context.getControllerService(*credentials));
+  return std::shared_ptr<ElasticsearchCredentialsControllerService>{};
+}
+}  // namespace
+
+void PostElasticsearch::onSchedule(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  context->getProperty(MaxBatchSize.getName(), max_batch_size_);
+  if (max_batch_size_ < 1)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Max Batch Size property is 
invalid");
+
+  std::string host_url{};
+  if (auto hosts_str = context->getProperty(Hosts)) {
+    auto hosts = utils::StringUtils::split(*hosts_str, ",");
+    if (hosts.size() > 1)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multiple hosts not yet 
supported");
+    host_url = hosts[0];
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid hosts");
+  }
+
+  auto credentials_service = getCredentialsService(*context);
+  if (!credentials_service)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing Elasticsearch 
credentials service");
+
+  client_.initialize("POST", host_url + "/_bulk", 
getSSLContextService(*context));
+  client_.setContentType("application/json");
+  credentials_service->authenticateClient(client_);
+}
+
+namespace {
+
+class ElasticPayload {
+ public:
+  [[nodiscard]] std::string toString() const {
+    auto result = headerString();
+    if (payload_) {
+      rapidjson::StringBuffer payload_buffer;
+      rapidjson::Writer<rapidjson::StringBuffer> 
payload_writer(payload_buffer);
+      payload_->Accept(payload_writer);
+      result = result + std::string("\n") + payload_buffer.GetString();
+    }
+    return result;
+  }
+
+  static auto parse(core::ProcessSession& session, core::ProcessContext& 
context, const std::shared_ptr<core::FlowFile>& flow_file) -> 
nonstd::expected<ElasticPayload, std::string> {
+    auto action = context.getProperty(PostElasticsearch::Action, flow_file);
+    if (!action || (action != "index" && action != "create" && action != 
"delete" && action != "update" && action != "upsert"))
+      return nonstd::make_unexpected("Missing or invalid action");
+
+    auto index = context.getProperty(PostElasticsearch::Index, flow_file);
+    if (!index)
+      return nonstd::make_unexpected("Missing index");
+
+    auto id = context.getProperty(PostElasticsearch::Identifier, flow_file);
+    if (!id && (action == "delete" || action == "update" || action == 
"upsert"))
+      return nonstd::make_unexpected("Identifier is required for DELETE,UPDATE 
and UPSERT actions");
+
+    std::optional<rapidjson::Document> payload;
+    if (action == "index" || action == "create") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      utils::JsonInputCallback callback(*payload);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+    }
+    if (action == "update" || action == "upsert") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      rapidjson::Document doc_member(rapidjson::kObjectType, 
&payload->GetAllocator());
+      utils::JsonInputCallback callback(doc_member);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+      if (action == "upsert") {
+        action = "update";
+        doc_member.AddMember("doc_as_upsert", true, doc_member.GetAllocator());
+      }
+      payload->AddMember("doc", doc_member, payload->GetAllocator());
+    }
+    return ElasticPayload(std::move(*action), std::move(*index), 
std::move(id), std::move(payload));
+  }
+
+ private:
+  ElasticPayload(std::string operation,
+                 std::string index,
+                 std::optional<std::string> id,
+                 std::optional<rapidjson::Document> payload) :
+      operation_(std::move(operation)),
+      index_(std::move(index)),
+      id_(std::move(id)),
+      payload_(std::move(payload)) {
+  }
+
+  [[nodiscard]] std::string headerString() const {
+    rapidjson::Document first_line = 
rapidjson::Document(rapidjson::kObjectType);
+
+    auto operation_index_key = rapidjson::Value(operation_.data(), 
operation_.size(), first_line.GetAllocator());
+    first_line.AddMember(operation_index_key, 
rapidjson::Value{rapidjson::kObjectType}, first_line.GetAllocator());
+    auto& operation_request = first_line[operation_.c_str()];
+
+    auto index_json = rapidjson::Value(index_.data(), index_.size(), 
first_line.GetAllocator());
+    operation_request.AddMember("_index", index_json, 
first_line.GetAllocator());
+
+    if (id_) {
+      auto id_json = rapidjson::Value(id_->data(), id_->size(), 
first_line.GetAllocator());
+      operation_request.AddMember("_id", id_json, first_line.GetAllocator());
+    }
+
+    rapidjson::StringBuffer buffer;
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    first_line.Accept(writer);
+
+    return buffer.GetString();
+  }
+
+  std::string operation_;
+  std::string index_;
+  std::optional<std::string> id_;
+  std::optional<rapidjson::Document> payload_;
+};
+
+auto submitRequest(utils::HTTPClient& client, const size_t expected_items) -> 
nonstd::expected<rapidjson::Document, std::string> {
+  if (!client.submit())
+    return nonstd::make_unexpected("Submit failed");
+  auto response_code = client.getResponseCode();
+  if (response_code != 200)
+    return nonstd::make_unexpected("Error occurred: " + 
std::to_string(response_code) + ", " + client.getResponseBody().data());
+  rapidjson::Document response;
+  rapidjson::ParseResult parse_result = 
response.Parse<rapidjson::kParseStopWhenDoneFlag>(client.getResponseBody().data());
+  if (parse_result.IsError())
+    return nonstd::make_unexpected("Response is not valid json");
+  if (!response.HasMember("items"))
+    return nonstd::make_unexpected("Response is invalid");
+  if (response["items"].Size() != expected_items)
+    return nonstd::make_unexpected("The number of responses dont match the 
number of requests");
+
+  return response;
+}
+
+void addAttributesFromResponse(std::string name, 
rapidjson::Value::ConstMemberIterator object, core::FlowFile& flow_file) {
+  name = name + "." + object->name.GetString();
+
+  if (object->value.IsObject()) {
+    for (auto it = object->value.MemberBegin(); it != 
object->value.MemberEnd(); ++it) {
+      addAttributesFromResponse(name, it, flow_file);
+    }
+  } else if (object->value.IsInt64()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetInt64()));
+  } else if (object->value.IsString()) {
+    flow_file.addAttribute(name, object->value.GetString());
+  } else if (object->value.IsBool()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetBool()));
+  }
+}
+}  // namespace
+
+void PostElasticsearch::onTrigger(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSession>& session) {

Review Comment:
   I may extract some parts like `collectFlowFiles` and `processSubmitResults`, 
but I don't insist if you think this way it's more readable.



##########
extensions/elasticsearch/tests/PostElasticsearchTests.cpp:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../PostElasticsearch.h"
+#include "../ElasticsearchCredentialsControllerService.h"
+#include "MockElastic.h"
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch::test {
+
+TEST_CASE("PostElasticsearch", "[elastic]") {
+  MockElastic mock_elastic("10433");
+
+  std::shared_ptr<PostElasticsearch> put_elasticsearch_json = 
std::make_shared<PostElasticsearch>("PostElasticsearch");
+  minifi::test::SingleProcessorTestController 
test_controller{put_elasticsearch_json};
+  auto elasticsearch_credentials_controller_service = 
test_controller.plan->addController("ElasticsearchCredentialsControllerService",
 "elasticsearch_credentials_controller_service");
+  CHECK(test_controller.plan->setProperty(put_elasticsearch_json,
+                                     
PostElasticsearch::ElasticCredentials.getName(),
+                                     
"elasticsearch_credentials_controller_service"));
+  CHECK(test_controller.plan->setProperty(put_elasticsearch_json,
+                                    PostElasticsearch::Hosts.getName(),
+                                    "localhost:10433"));
+  CHECK(test_controller.plan->setProperty(put_elasticsearch_json,
+                                    PostElasticsearch::Action.getName(),
+                                    "${elastic_action}"));
+  CHECK(test_controller.plan->setProperty(put_elasticsearch_json,
+                                    PostElasticsearch::Index.getName(),
+                                    "test_index"));
+
+  SECTION("Index with valid basic authentication") {
+    
CHECK(test_controller.plan->setProperty(elasticsearch_credentials_controller_service,
+                                            
ElasticsearchCredentialsControllerService::Username.getName(),
+                                            MockElasticAuthHandler::USERNAME));
+    
CHECK(test_controller.plan->setProperty(elasticsearch_credentials_controller_service,
+                                            
ElasticsearchCredentialsControllerService::Password.getName(),
+                                            MockElasticAuthHandler::PASSWORD));
+
+    std::vector<std::tuple<const std::string_view, 
std::unordered_map<std::string, std::string>>> inputs;
+
+    auto results = test_controller.trigger({std::make_tuple<const 
std::string_view, std::unordered_map<std::string, 
std::string>>(R"({"field1":"value1"}")", {{"elastic_action", "index"}}),
+                                            std::make_tuple<const 
std::string_view, std::unordered_map<std::string, 
std::string>>(R"({"field1":"value2"}")", {{"elastic_action", "index"}})});
+    REQUIRE(results[PostElasticsearch::Success].size() == 2);
+    for (const auto& result : results[PostElasticsearch::Success]) {
+      auto attributes = result->getAttributes();
+      CHECK(attributes.contains("elasticsearch.index._id"));
+      CHECK(attributes.contains("elasticsearch.index._index"));
+    }
+  }
+
+  SECTION("Update with valid ApiKey") {
+    
CHECK(test_controller.plan->setProperty(elasticsearch_credentials_controller_service,
+                                            
ElasticsearchCredentialsControllerService::ApiKey.getName(),
+                                            MockElasticAuthHandler::API_KEY));
+    CHECK(test_controller.plan->setProperty(put_elasticsearch_json,
+                                            
PostElasticsearch::Identifier.getName(),
+                                            "${filename}"));
+
+    auto results = test_controller.trigger(R"({"field1":"value1"}")", 
{{"elastic_action", "upsert"}});
+    REQUIRE(results[PostElasticsearch::Success].size() == 1);
+    auto attributes = results[PostElasticsearch::Success][0]->getAttributes();
+    CHECK(attributes.contains("elasticsearch.update._id"));
+    CHECK(attributes.contains("elasticsearch.update._index"));
+  }
+
+  SECTION("Invalid ApiKey") {
+    
CHECK(test_controller.plan->setProperty(elasticsearch_credentials_controller_service,
+                                            
ElasticsearchCredentialsControllerService::ApiKey.getName(),
+                                            "invalid_api_key"));
+
+    auto results = test_controller.trigger(R"({"field1":"value1"}")", 
{{"elastic_action", "create"}});
+    CHECK(results[PostElasticsearch::Failure].size() == 1);
+  }
+
+  SECTION("Invalid basic authentication") {

Review Comment:
   Could you add test for Error relationship case as well?



##########
extensions/elasticsearch/PostElasticsearch.cpp:
##########
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "PostElasticsearch.h"
+#include <vector>
+#include <utility>
+
+#include "ElasticsearchCredentialsControllerService.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "utils/expected.h"
+#include "utils/JsonCallback.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch {
+
+const core::Relationship PostElasticsearch::Success("success", "All flowfiles 
that succeed in being transferred into Elasticsearch go here.");
+const core::Relationship PostElasticsearch::Failure("failure", "All flowfiles 
that fail for reasons unrelated to server availability go to this 
relationship.");
+const core::Relationship PostElasticsearch::Error("error", "All flowfiles that 
Elasticsearch responded to with an error go to this relationship.");
+
+const core::Property PostElasticsearch::Action = 
core::PropertyBuilder::createProperty("Action")
+    ->withDescription("The type of the operation used to index (create, 
delete, index, update, upsert)")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::MaxBatchSize = 
core::PropertyBuilder::createProperty("Max Batch Size")
+    ->withDescription("The maximum number of Syslog events to process at a 
time.")
+    ->withDefaultValue<uint64_t>(100)
+    ->build();
+
+const core::Property PostElasticsearch::ElasticCredentials = 
core::PropertyBuilder::createProperty("Elasticsearch Credentials Provider 
Service")
+    ->withDescription("The Controller Service used to obtain Elasticsearch 
credentials.")
+    ->isRequired(true)
+    ->asType<ElasticsearchCredentialsControllerService>()
+    ->build();
+
+const core::Property PostElasticsearch::SSLContext = 
core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The SSL Context Service used to provide client 
certificate "
+                      "information for TLS/SSL (https) connections.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()->build();
+
+const core::Property PostElasticsearch::Hosts = 
core::PropertyBuilder::createProperty("Hosts")
+    ->withDescription("A comma-separated list of HTTP hosts that host 
Elasticsearch query nodes. Currently only supports a single host.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Index = 
core::PropertyBuilder::createProperty("Index")
+    ->withDescription("The name of the index to use.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Identifier = 
core::PropertyBuilder::createProperty("Identifier")
+    ->withDescription("If the Action is \"index\" or \"create\", this property 
may be left empty or evaluate to an empty value, "
+                      "in which case the document's identifier will be 
auto-generated by Elasticsearch. "
+                      "For all other Actions, the attribute must evaluate to a 
non-empty value.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+
+void PostElasticsearch::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = context.getProperty(PostElasticsearch::SSLContext))
+    return 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context));
+  return std::shared_ptr<minifi::controllers::SSLContextService>{};
+}
+
+auto getCredentialsService(core::ProcessContext& context) {
+  if (auto credentials = 
context.getProperty(PostElasticsearch::ElasticCredentials))
+    return 
std::dynamic_pointer_cast<ElasticsearchCredentialsControllerService>(context.getControllerService(*credentials));
+  return std::shared_ptr<ElasticsearchCredentialsControllerService>{};
+}
+}  // namespace
+
+void PostElasticsearch::onSchedule(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  context->getProperty(MaxBatchSize.getName(), max_batch_size_);
+  if (max_batch_size_ < 1)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Max Batch Size property is 
invalid");
+
+  std::string host_url{};
+  if (auto hosts_str = context->getProperty(Hosts)) {
+    auto hosts = utils::StringUtils::split(*hosts_str, ",");
+    if (hosts.size() > 1)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multiple hosts not yet 
supported");
+    host_url = hosts[0];
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid hosts");
+  }
+
+  auto credentials_service = getCredentialsService(*context);
+  if (!credentials_service)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing Elasticsearch 
credentials service");
+
+  client_.initialize("POST", host_url + "/_bulk", 
getSSLContextService(*context));
+  client_.setContentType("application/json");
+  credentials_service->authenticateClient(client_);
+}
+
+namespace {
+
+class ElasticPayload {
+ public:
+  [[nodiscard]] std::string toString() const {
+    auto result = headerString();
+    if (payload_) {
+      rapidjson::StringBuffer payload_buffer;
+      rapidjson::Writer<rapidjson::StringBuffer> 
payload_writer(payload_buffer);
+      payload_->Accept(payload_writer);
+      result = result + std::string("\n") + payload_buffer.GetString();
+    }
+    return result;
+  }
+
+  static auto parse(core::ProcessSession& session, core::ProcessContext& 
context, const std::shared_ptr<core::FlowFile>& flow_file) -> 
nonstd::expected<ElasticPayload, std::string> {
+    auto action = context.getProperty(PostElasticsearch::Action, flow_file);
+    if (!action || (action != "index" && action != "create" && action != 
"delete" && action != "update" && action != "upsert"))
+      return nonstd::make_unexpected("Missing or invalid action");
+
+    auto index = context.getProperty(PostElasticsearch::Index, flow_file);
+    if (!index)
+      return nonstd::make_unexpected("Missing index");
+
+    auto id = context.getProperty(PostElasticsearch::Identifier, flow_file);
+    if (!id && (action == "delete" || action == "update" || action == 
"upsert"))
+      return nonstd::make_unexpected("Identifier is required for DELETE,UPDATE 
and UPSERT actions");
+
+    std::optional<rapidjson::Document> payload;
+    if (action == "index" || action == "create") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      utils::JsonInputCallback callback(*payload);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+    }
+    if (action == "update" || action == "upsert") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      rapidjson::Document doc_member(rapidjson::kObjectType, 
&payload->GetAllocator());
+      utils::JsonInputCallback callback(doc_member);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+      if (action == "upsert") {
+        action = "update";
+        doc_member.AddMember("doc_as_upsert", true, doc_member.GetAllocator());
+      }
+      payload->AddMember("doc", doc_member, payload->GetAllocator());
+    }
+    return ElasticPayload(std::move(*action), std::move(*index), 
std::move(id), std::move(payload));
+  }
+
+ private:
+  ElasticPayload(std::string operation,
+                 std::string index,
+                 std::optional<std::string> id,
+                 std::optional<rapidjson::Document> payload) :
+      operation_(std::move(operation)),
+      index_(std::move(index)),
+      id_(std::move(id)),
+      payload_(std::move(payload)) {
+  }
+
+  [[nodiscard]] std::string headerString() const {
+    rapidjson::Document first_line = 
rapidjson::Document(rapidjson::kObjectType);
+
+    auto operation_index_key = rapidjson::Value(operation_.data(), 
operation_.size(), first_line.GetAllocator());
+    first_line.AddMember(operation_index_key, 
rapidjson::Value{rapidjson::kObjectType}, first_line.GetAllocator());
+    auto& operation_request = first_line[operation_.c_str()];
+
+    auto index_json = rapidjson::Value(index_.data(), index_.size(), 
first_line.GetAllocator());
+    operation_request.AddMember("_index", index_json, 
first_line.GetAllocator());
+
+    if (id_) {
+      auto id_json = rapidjson::Value(id_->data(), id_->size(), 
first_line.GetAllocator());
+      operation_request.AddMember("_id", id_json, first_line.GetAllocator());
+    }
+
+    rapidjson::StringBuffer buffer;
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    first_line.Accept(writer);
+
+    return buffer.GetString();
+  }
+
+  std::string operation_;
+  std::string index_;
+  std::optional<std::string> id_;
+  std::optional<rapidjson::Document> payload_;
+};
+
+auto submitRequest(utils::HTTPClient& client, const size_t expected_items) -> 
nonstd::expected<rapidjson::Document, std::string> {
+  if (!client.submit())
+    return nonstd::make_unexpected("Submit failed");
+  auto response_code = client.getResponseCode();
+  if (response_code != 200)
+    return nonstd::make_unexpected("Error occurred: " + 
std::to_string(response_code) + ", " + client.getResponseBody().data());
+  rapidjson::Document response;
+  rapidjson::ParseResult parse_result = 
response.Parse<rapidjson::kParseStopWhenDoneFlag>(client.getResponseBody().data());
+  if (parse_result.IsError())
+    return nonstd::make_unexpected("Response is not valid json");
+  if (!response.HasMember("items"))
+    return nonstd::make_unexpected("Response is invalid");
+  if (response["items"].Size() != expected_items)
+    return nonstd::make_unexpected("The number of responses dont match the 
number of requests");
+
+  return response;
+}
+
+void addAttributesFromResponse(std::string name, 
rapidjson::Value::ConstMemberIterator object, core::FlowFile& flow_file) {
+  name = name + "." + object->name.GetString();
+
+  if (object->value.IsObject()) {
+    for (auto it = object->value.MemberBegin(); it != 
object->value.MemberEnd(); ++it) {
+      addAttributesFromResponse(name, it, flow_file);
+    }
+  } else if (object->value.IsInt64()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetInt64()));
+  } else if (object->value.IsString()) {
+    flow_file.addAttribute(name, object->value.GetString());
+  } else if (object->value.IsBool()) {
+    flow_file.addAttribute(name, std::to_string(object->value.GetBool()));
+  }
+}
+}  // namespace
+
+void PostElasticsearch::onTrigger(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context && session && max_batch_size_ > 0);
+  std::stringstream payload;
+  std::vector<std::shared_ptr<core::FlowFile>> flowfiles_in_payload;
+  for (size_t flow_files_processed = 0; flow_files_processed < 
max_batch_size_; ++flow_files_processed) {
+    auto flow_file = session->get();
+    if (!flow_file)
+      break;
+    auto elastic_payload = ElasticPayload::parse(*session, *context, 
flow_file);
+    if (!elastic_payload) {
+      logger_->log_error(elastic_payload.error().c_str());
+      session->transfer(flow_file, Failure);
+      continue;
+    }
+
+    payload << elastic_payload->toString() << "\n";
+    flowfiles_in_payload.push_back(flow_file);
+  }
+
+  if (flowfiles_in_payload.empty()) {
+    yield();
+    return;
+  }
+
+
+  client_.setPostFields(payload.str());
+  auto result = submitRequest(client_, flowfiles_in_payload.size());
+  if (!result) {
+    logger_->log_error(result.error().c_str());
+    for (const auto& flow_file_in_payload: flowfiles_in_payload)
+      session->transfer(flow_file_in_payload, Failure);
+    return;
+  }
+
+  auto& items = result->operator[]("items");
+  gsl_Expects(items.Size() == flowfiles_in_payload.size());
+  for (size_t i = 0; i < items.Size(); ++i) {
+    for (auto it = items[i].MemberBegin(); it != items[i].MemberEnd(); ++it) {
+      addAttributesFromResponse("elasticsearch", it, *flowfiles_in_payload[i]);
+    }
+    if (items[i].MemberBegin()->value.HasMember("error"))

Review Comment:
   If an `error` attribute is present is there any other attribute that is 
received and set in the flowfile? Do we want to do that in the error case? I 
see that in Nifi's PutElasticsearchJson processor for example a specific 
`elasticsearch.put.error` attribute is set if an error occurs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to