martinzink commented on code in PR #1349:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1349#discussion_r908575622


##########
extensions/elasticsearch/PostElasticsearch.cpp:
##########
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "PostElasticsearch.h"
+#include <vector>
+#include <utility>
+
+#include "ElasticsearchCredentialsControllerService.h"
+#include "core/ProcessContext.h"
+#include "core/ProcessSession.h"
+#include "core/PropertyBuilder.h"
+#include "core/Resource.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "utils/expected.h"
+#include "utils/JsonCallback.h"
+
+namespace org::apache::nifi::minifi::extensions::elasticsearch {
+
+const core::Relationship PostElasticsearch::Success("success", "All flowfiles 
that succeed in being transferred into Elasticsearch go here.");
+const core::Relationship PostElasticsearch::Failure("failure", "All flowfiles 
that fail for reasons unrelated to server availability go to this 
relationship.");
+const core::Relationship PostElasticsearch::Error("error", "All flowfiles that 
Elasticsearch responded to with an error go to this relationship.");
+
+const core::Property PostElasticsearch::Action = 
core::PropertyBuilder::createProperty("Action")
+    ->withDescription("The type of the operation used to index (create, 
delete, index, update, upsert)")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::MaxBatchSize = 
core::PropertyBuilder::createProperty("Max Batch Size")
+    ->withDescription("The maximum number of flow files to process at a time.")
+    ->withDefaultValue<uint64_t>(100)
+    ->build();
+
+const core::Property PostElasticsearch::ElasticCredentials = 
core::PropertyBuilder::createProperty("Elasticsearch Credentials Provider 
Service")
+    ->withDescription("The Controller Service used to obtain Elasticsearch 
credentials.")
+    ->isRequired(true)
+    ->asType<ElasticsearchCredentialsControllerService>()
+    ->build();
+
+const core::Property PostElasticsearch::SSLContext = 
core::PropertyBuilder::createProperty("SSL Context Service")
+    ->withDescription("The SSL Context Service used to provide client 
certificate "
+                      "information for TLS/SSL (https) connections.")
+    ->isRequired(false)
+    ->asType<minifi::controllers::SSLContextService>()->build();
+
+const core::Property PostElasticsearch::Hosts = 
core::PropertyBuilder::createProperty("Hosts")
+    ->withDescription("A comma-separated list of HTTP hosts that host 
Elasticsearch query nodes. Currently only supports a single host.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Index = 
core::PropertyBuilder::createProperty("Index")
+    ->withDescription("The name of the index to use.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property PostElasticsearch::Identifier = 
core::PropertyBuilder::createProperty("Identifier")
+    ->withDescription("If the Action is \"index\" or \"create\", this property 
may be left empty or evaluate to an empty value, "
+                      "in which case the document's identifier will be 
auto-generated by Elasticsearch. "
+                      "For all other Actions, the attribute must evaluate to a 
non-empty value.")
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+
+void PostElasticsearch::initialize() {
+  setSupportedProperties(properties());
+  setSupportedRelationships(relationships());
+}
+
+namespace {
+auto getSSLContextService(core::ProcessContext& context) {
+  if (auto ssl_context = context.getProperty(PostElasticsearch::SSLContext))
+    return 
std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context.getControllerService(*ssl_context));
+  return std::shared_ptr<minifi::controllers::SSLContextService>{};
+}
+
+auto getCredentialsService(core::ProcessContext& context) {
+  if (auto credentials = 
context.getProperty(PostElasticsearch::ElasticCredentials))
+    return 
std::dynamic_pointer_cast<ElasticsearchCredentialsControllerService>(context.getControllerService(*credentials));
+  return std::shared_ptr<ElasticsearchCredentialsControllerService>{};
+}
+}  // namespace
+
+void PostElasticsearch::onSchedule(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  context->getProperty(MaxBatchSize.getName(), max_batch_size_);
+  if (max_batch_size_ < 1)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Max Batch Size property is 
invalid");
+
+  std::string host_url{};
+  if (auto hosts_str = context->getProperty(Hosts)) {
+    auto hosts = utils::StringUtils::split(*hosts_str, ",");
+    if (hosts.size() > 1)
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Multiple hosts not yet 
supported");
+    host_url = hosts[0];
+  } else {
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing or invalid hosts");
+  }
+
+  auto credentials_service = getCredentialsService(*context);
+  if (!credentials_service)
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Missing Elasticsearch 
credentials service");
+
+  client_.initialize("POST", host_url + "/_bulk", 
getSSLContextService(*context));
+  client_.setContentType("application/json");

Review Comment:
   great catch, I reverted PostElasticsearch so it wont reuse it and I will 
create a ticket to make HTTPClient not single-use only.
   
https://github.com/apache/nifi-minifi-cpp/pull/1349/commits/2f44b46f16a0d1be87788574216ac730811d84a1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to