martinzink commented on code in PR #1349:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1349#discussion_r907253717


##########
extensions/http-curl/client/HTTPClient.h:
##########
@@ -297,6 +301,8 @@ class HTTPClient : public BaseHTTPClient, public 
core::Connectable {
 
   std::chrono::milliseconds keep_alive_idle_{-1};
 
+  std::optional<std::pair<std::string, std::string>> username_password_;

Review Comment:
   yeah it does help the readability changed it in 
https://github.com/apache/nifi-minifi-cpp/pull/1349/commits/fdb1bf2a34423815cfe2cb395e83dcee3bc2f1cd#diff-475912c726caf957fa46ce1b55d3e845e89aa19b4c7caf2cd4a0ae1f860cbdd5R304-R311



##########
libminifi/test/SingleProcessorTestController.h:
##########
@@ -57,6 +58,14 @@ class SingleProcessorTestController : public TestController {
     return trigger();
   }
 
+  auto trigger(std::initializer_list<std::tuple<const std::string_view, 
std::unordered_map<std::string, std::string>>> flow_files) {

Review Comment:
   good idea, I've changed it in 
https://github.com/apache/nifi-minifi-cpp/pull/1349/commits/fdb1bf2a34423815cfe2cb395e83dcee3bc2f1cd#diff-a019248b6c700d2a0bc69681c6ea011e5afe07861a32a021f992375eeda11847R70



##########
extensions/elasticsearch/tests/PostElasticsearchTests.cpp:
##########
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../PostElasticsearch.h"
+#include "../ElasticsearchCredentialsControllerService.h"
+#include "MockElastic.h"
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch::test {
+
+TEST_CASE("PostElasticsearch", "[elastic]") {
+  MockElastic mock_elastic("10433");
+
+  std::shared_ptr<PostElasticsearch> put_elasticsearch_json = 
std::make_shared<PostElasticsearch>("PostElasticsearch");
+  minifi::test::SingleProcessorTestController 
test_controller{put_elasticsearch_json};
+  auto elasticsearch_credentials_controller_service = 
test_controller.plan->addController("ElasticsearchCredentialsControllerService",
 "elasticsearch_credentials_controller_service");
+  CHECK(test_controller.plan->setProperty(put_elasticsearch_json,
+                                     
PostElasticsearch::ElasticCredentials.getName(),
+                                     
"elasticsearch_credentials_controller_service"));
+  CHECK(test_controller.plan->setProperty(put_elasticsearch_json,
+                                    PostElasticsearch::Hosts.getName(),
+                                    "localhost:10433"));
+  CHECK(test_controller.plan->setProperty(put_elasticsearch_json,
+                                    PostElasticsearch::Action.getName(),
+                                    "${elastic_action}"));
+  CHECK(test_controller.plan->setProperty(put_elasticsearch_json,
+                                    PostElasticsearch::Index.getName(),
+                                    "test_index"));
+
+  SECTION("Index with valid basic authentication") {
+    
CHECK(test_controller.plan->setProperty(elasticsearch_credentials_controller_service,
+                                            
ElasticsearchCredentialsControllerService::Username.getName(),
+                                            MockElasticAuthHandler::USERNAME));
+    
CHECK(test_controller.plan->setProperty(elasticsearch_credentials_controller_service,
+                                            
ElasticsearchCredentialsControllerService::Password.getName(),
+                                            MockElasticAuthHandler::PASSWORD));
+
+    std::vector<std::tuple<const std::string_view, 
std::unordered_map<std::string, std::string>>> inputs;

Review Comment:
   good catch, removed it in 
https://github.com/apache/nifi-minifi-cpp/pull/1349/commits/fdb1bf2a34423815cfe2cb395e83dcee3bc2f1cd#diff-da99b94efb790428cd1298692800cab09ee2b398538b67ec7f0d5e4bfae2605dL53-L56



##########
extensions/elasticsearch/tests/PostElasticsearchTests.cpp:
##########
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "../PostElasticsearch.h"
+#include "../ElasticsearchCredentialsControllerService.h"
+#include "MockElastic.h"
+#include "SingleProcessorTestController.h"
+#include "Catch.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch::test {
+
+TEST_CASE("PostElasticsearch", "[elastic]") {
+  MockElastic mock_elastic("10433");
+
+  std::shared_ptr<PostElasticsearch> put_elasticsearch_json = 
std::make_shared<PostElasticsearch>("PostElasticsearch");

Review Comment:
   :+1: 
https://github.com/apache/nifi-minifi-cpp/pull/1349/commits/fdb1bf2a34423815cfe2cb395e83dcee3bc2f1cd#diff-da99b94efb790428cd1298692800cab09ee2b398538b67ec7f0d5e4bfae2605dR29



##########
extensions/elasticsearch/PostElasticsearch.cpp:
##########
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "PostElasticsearch.h"
+#include <vector>
+#include <utility>
+
+#include "ElasticsearchCredentialsControllerService.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "utils/expected.h"
+#include "utils/JsonCallback.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch {
+
+const core::Relationship PostElasticsearch::Success("success", "All flowfiles 
that succeed in being transferred into Elasticsearch go here.");
+const core::Relationship PostElasticsearch::Failure("failure", "All flowfiles 
that fail for reasons unrelated to server availability go to this 
relationship.");
+const core::Relationship PostElasticsearch::Error("error", "All flowfiles that 
Elasticsearch responded to with an error go to this relationship.");
+
+const core::Property PostElasticsearch::Action = 
core::PropertyBuilder::createProperty("Action")
+    ->withDescription("The type of the operation used to index (create, 
delete, index, update, upsert)")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::MaxBatchSize = 
core::PropertyBuilder::createProperty("Max Batch Size")
+    ->withDescription("The maximum number of Syslog events to process at a 
time.")
+    ->withDefaultValue<uint64_t>(100)
+    ->build();
+
+const core::Property PostElasticsearch::ElasticCredentials = 
core::PropertyBuilder::createProperty("Elasticsearch Credentials Provider 
Service")
+    ->withDescription("The Controller Service used to obtain Elasticsearch 
credentials.")
+    ->isRequired(true)
+    ->asType<ElasticsearchCredentialsControllerService>()
+    ->build();
+
+const core::Property PostElasticsearch::SSLContext = 
core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The SSL Context Service used to provide client 
certificate "
+                      "information for TLS/SSL (https) connections.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()->build();
+
+const core::Property PostElasticsearch::Hosts = 
core::PropertyBuilder::createProperty("Hosts")
+    ->withDescription("A comma-separated list of HTTP hosts that host 
Elasticsearch query nodes. Currently only supports a single host.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Index = 
core::PropertyBuilder::createProperty("Index")
+    ->withDescription("The name of the index to use.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Identifier = 
core::PropertyBuilder::createProperty("Identifier")
+    ->withDescription("If the Action is \"index\" or \"create\", this property 
may be left empty or evaluate to an empty value, "
+                      "in which case the document's identifier will be 
auto-generated by Elasticsearch. "
+                      "For all other Actions, the attribute must evaluate to a 
non-empty value.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+
+void PostElasticsearch::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = context.getProperty(PostElasticsearch::SSLContext))
+    return 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context));
+  return std::shared_ptr<minifi::controllers::SSLContextService>{};
+}
+
+auto getCredentialsService(core::ProcessContext& context) {
+  if (auto credentials = 
context.getProperty(PostElasticsearch::ElasticCredentials))
+    return 
std::dynamic_pointer_cast<ElasticsearchCredentialsControllerService>(context.getControllerService(*credentials));
+  return std::shared_ptr<ElasticsearchCredentialsControllerService>{};
+}
+}  // namespace
+
+void PostElasticsearch::onSchedule(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  context->getProperty(MaxBatchSize.getName(), max_batch_size_);
+  if (max_batch_size_ < 1)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Max Batch Size property is 
invalid");
+
+  std::string host_url{};
+  if (auto hosts_str = context->getProperty(Hosts)) {
+    auto hosts = utils::StringUtils::split(*hosts_str, ",");
+    if (hosts.size() > 1)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multiple hosts not yet 
supported");
+    host_url = hosts[0];
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid hosts");
+  }
+
+  auto credentials_service = getCredentialsService(*context);
+  if (!credentials_service)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing Elasticsearch 
credentials service");
+
+  client_.initialize("POST", host_url + "/_bulk", 
getSSLContextService(*context));
+  client_.setContentType("application/json");
+  credentials_service->authenticateClient(client_);
+}
+
+namespace {
+
+class ElasticPayload {
+ public:
+  [[nodiscard]] std::string toString() const {
+    auto result = headerString();
+    if (payload_) {
+      rapidjson::StringBuffer payload_buffer;
+      rapidjson::Writer<rapidjson::StringBuffer> 
payload_writer(payload_buffer);
+      payload_->Accept(payload_writer);
+      result = result + std::string("\n") + payload_buffer.GetString();
+    }
+    return result;
+  }
+
+  static auto parse(core::ProcessSession& session, core::ProcessContext& 
context, const std::shared_ptr<core::FlowFile>& flow_file) -> 
nonstd::expected<ElasticPayload, std::string> {
+    auto action = context.getProperty(PostElasticsearch::Action, flow_file);
+    if (!action || (action != "index" && action != "create" && action != 
"delete" && action != "update" && action != "upsert"))
+      return nonstd::make_unexpected("Missing or invalid action");
+
+    auto index = context.getProperty(PostElasticsearch::Index, flow_file);
+    if (!index)
+      return nonstd::make_unexpected("Missing index");
+
+    auto id = context.getProperty(PostElasticsearch::Identifier, flow_file);
+    if (!id && (action == "delete" || action == "update" || action == 
"upsert"))
+      return nonstd::make_unexpected("Identifier is required for DELETE,UPDATE 
and UPSERT actions");
+
+    std::optional<rapidjson::Document> payload;
+    if (action == "index" || action == "create") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      utils::JsonInputCallback callback(*payload);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+    }
+    if (action == "update" || action == "upsert") {
+      payload = rapidjson::Document(rapidjson::kObjectType);
+      rapidjson::Document doc_member(rapidjson::kObjectType, 
&payload->GetAllocator());
+      utils::JsonInputCallback callback(doc_member);
+      if (session.read(flow_file, std::ref(callback)) < 0) {
+        return nonstd::make_unexpected("invalid flowfile content");
+      }
+      if (action == "upsert") {
+        action = "update";
+        doc_member.AddMember("doc_as_upsert", true, doc_member.GetAllocator());
+      }
+      payload->AddMember("doc", doc_member, payload->GetAllocator());
+    }
+    return ElasticPayload(std::move(*action), std::move(*index), 
std::move(id), std::move(payload));
+  }
+
+ private:
+  ElasticPayload(std::string operation,
+                 std::string index,
+                 std::optional<std::string> id,
+                 std::optional<rapidjson::Document> payload) :
+      operation_(std::move(operation)),
+      index_(std::move(index)),
+      id_(std::move(id)),
+      payload_(std::move(payload)) {
+  }
+
+  [[nodiscard]] std::string headerString() const {
+    rapidjson::Document first_line = 
rapidjson::Document(rapidjson::kObjectType);
+
+    auto operation_index_key = rapidjson::Value(operation_.data(), 
operation_.size(), first_line.GetAllocator());
+    first_line.AddMember(operation_index_key, 
rapidjson::Value{rapidjson::kObjectType}, first_line.GetAllocator());
+    auto& operation_request = first_line[operation_.c_str()];
+
+    auto index_json = rapidjson::Value(index_.data(), index_.size(), 
first_line.GetAllocator());
+    operation_request.AddMember("_index", index_json, 
first_line.GetAllocator());
+
+    if (id_) {
+      auto id_json = rapidjson::Value(id_->data(), id_->size(), 
first_line.GetAllocator());
+      operation_request.AddMember("_id", id_json, first_line.GetAllocator());
+    }
+
+    rapidjson::StringBuffer buffer;
+    rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+    first_line.Accept(writer);
+
+    return buffer.GetString();
+  }
+
+  std::string operation_;
+  std::string index_;
+  std::optional<std::string> id_;
+  std::optional<rapidjson::Document> payload_;
+};
+
+auto submitRequest(utils::HTTPClient& client, std::string&& payload, const 
size_t expected_items) -> nonstd::expected<rapidjson::Document, std::string> {

Review Comment:
   makes sense, changed this and other occurrence in 
https://github.com/apache/nifi-minifi-cpp/pull/1349/commits/fdb1bf2a34423815cfe2cb395e83dcee3bc2f1cd#diff-bdbf058968be4e5a837bb513eb74292c24911b773939d6b3404a7c5e7b9a609dR216



##########
extensions/elasticsearch/PostElasticsearch.cpp:
##########
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "PostElasticsearch.h"
+#include <vector>
+#include <utility>
+
+#include "ElasticsearchCredentialsControllerService.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "utils/expected.h"
+#include "utils/JsonCallback.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch {
+
+const core::Relationship PostElasticsearch::Success("success", "All flowfiles 
that succeed in being transferred into Elasticsearch go here.");
+const core::Relationship PostElasticsearch::Failure("failure", "All flowfiles 
that fail for reasons unrelated to server availability go to this 
relationship.");
+const core::Relationship PostElasticsearch::Error("error", "All flowfiles that 
Elasticsearch responded to with an error go to this relationship.");
+
+const core::Property PostElasticsearch::Action = 
core::PropertyBuilder::createProperty("Action")
+    ->withDescription("The type of the operation used to index (create, 
delete, index, update, upsert)")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::MaxBatchSize = 
core::PropertyBuilder::createProperty("Max Batch Size")
+    ->withDescription("The maximum number of Syslog events to process at a 
time.")

Review Comment:
   changed this in 
https://github.com/apache/nifi-minifi-cpp/pull/1349/commits/fdb1bf2a34423815cfe2cb395e83dcee3bc2f1cd#diff-bdbf058968be4e5a837bb513eb74292c24911b773939d6b3404a7c5e7b9a609dR46



##########
docker/test/integration/resources/elasticsearch/Dockerfile:
##########
@@ -0,0 +1,7 @@
+FROM elasticsearch:8.2.2
+COPY elasticsearch.yml /usr/share/elasticsearch/config/elasticsearch.yml
+COPY certs/elastic_cert.key 
/usr/share/elasticsearch/config/certs/elastic_cert.key
+COPY certs/elastic_cert.pem 
/usr/share/elasticsearch/config/certs/elastic_cert.pem
+COPY certs/elastic_transport.key 
/usr/share/elasticsearch/config/certs/elastic_transport.key
+COPY certs/elastic_transport.pem 
/usr/share/elasticsearch/config/certs/elastic_transport.pem

Review Comment:
   good idea, I've changed them in 
https://github.com/apache/nifi-minifi-cpp/pull/1349/commits/fdb1bf2a34423815cfe2cb395e83dcee3bc2f1cd#diff-a10da10e9ba31d953619871d6557f4fe8bffa0b1ceb54bf8612b6de6bc1f33e1L3-L7



##########
PROCESSORS.md:
##########
@@ -1640,6 +1641,35 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 | success | All files are routed to success |
 
 
+## PostElasticsearch
+
+### Description
+
+An Elasticsearch/Opensearch post processor that uses the 
Elasticsearch/Opensearch _bulk REST API.
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
+
+| Name                                           | Default Value | Allowable 
Values | Description                                                            
                                                                                
                                                                                
                                                                   |
+|------------------------------------------------|---------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Action                                         |               |             
     | The type of the operation used to index (create, delete, index, update, 
upsert)<br/>**Supports Expression Language: true**                              
                                                                                
                                                                  |
+| Max Batch Size                                 | 100           |             
     | The maximum number of Syslog events to process at a time.                
                                                                                
                                                                                
                                                                 |

Review Comment:
   oops, nice catch, I've included this in 
https://github.com/apache/nifi-minifi-cpp/pull/1349/commits/fdb1bf2a34423815cfe2cb395e83dcee3bc2f1cd#diff-fd2410931e7fdc4bf8b3ce23f5f7a27c7aacdf9337320626d86d806448c90b9bR1656



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to