http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/c2/C2Payload.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/C2Payload.cpp b/libminifi/src/c2/C2Payload.cpp new file mode 100644 index 0000000..2a737d4 --- /dev/null +++ b/libminifi/src/c2/C2Payload.cpp @@ -0,0 +1,219 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "c2/C2Payload.h" +#include <utility> +#include <vector> +#include <string> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +C2ContentResponse::C2ContentResponse(Operation op) + : op(op), + required(false), + delay(0), + ttl(-1) { +} + +C2ContentResponse::C2ContentResponse(const C2ContentResponse &other) + : op(other.op), + required(other.required), + delay(other.delay), + ttl(other.ttl), + name(other.name), + ident(other.ident), + operation_arguments(other.operation_arguments) { +} + +C2ContentResponse::C2ContentResponse(const C2ContentResponse &&other) + : op(other.op), + required(other.required), + delay(std::move(other.delay)), + ttl(std::move(other.ttl)), + ident(std::move(other.ident)), + name(std::move(other.name)), + operation_arguments(std::move(other.operation_arguments)) { +} + +C2ContentResponse &C2ContentResponse::operator=(const C2ContentResponse &&other) { + op = other.op; + required = other.required; + delay = std::move(other.delay); + ttl = std::move(other.ttl); + name = std::move(other.name); + ident = std::move(other.ident); + operation_arguments = std::move(other.operation_arguments); + return *this; +} + +C2ContentResponse &C2ContentResponse::operator=(const C2ContentResponse &other) { + op = other.op; + required = other.required; + delay = other.delay; + ttl = other.ttl; + name = other.name; + operation_arguments = other.operation_arguments; + return *this; +} + +C2Payload::C2Payload(Operation op, std::string identifier, bool resp, bool isRaw) + : state::Update(state::UpdateStatus(state::UpdateState::INITIATE, 0)), + op_(op), + raw_(isRaw), + ident_(identifier), + isResponse(resp) { +} + +C2Payload::C2Payload(Operation op, bool resp, bool isRaw) + : state::Update(state::UpdateStatus(state::UpdateState::INITIATE, 0)), + op_(op), + raw_(isRaw), + isResponse(resp) { +} + +C2Payload::C2Payload(Operation op, state::UpdateState state, bool resp, bool isRaw) + : state::Update(state::UpdateStatus(state, 0)), + op_(op), + raw_(isRaw), + isResponse(resp) { +} + +C2Payload::C2Payload(const C2Payload &other) + : state::Update(other), + isResponse(other.isResponse), + op_(other.op_), + raw_(other.raw_), + label_(other.label_), + ident_(other.ident_), + raw_data_(other.raw_data_), + payloads_(other.payloads_), + content_(other.content_) { +} + +C2Payload::C2Payload(const C2Payload &&other) + : state::Update(std::move(other)), + isResponse(other.isResponse), + op_(std::move(other.op_)), + raw_(other.raw_), + label_(std::move(other.label_)), + ident_(std::move(other.ident_)), + raw_data_(std::move(other.raw_data_)), + payloads_(std::move(other.payloads_)), + content_(std::move(other.content_)) { +} + +void C2Payload::setIdentifier(const std::string &ident) { + ident_ = ident; +} + +std::string C2Payload::getIdentifier() const { + return ident_; +} + +Operation C2Payload::getOperation() const { + return op_; +} + +bool C2Payload::validate() { + return true; +} + +const std::vector<C2ContentResponse> &C2Payload::getContent() const { + return content_; +} + +void C2Payload::addContent(const C2ContentResponse &&content) { + for (auto &existing_content : content_) { + if (existing_content.name == content.name) { + for (auto subcontent : existing_content.operation_arguments) { + } + + for (auto subcontent : content.operation_arguments) { + } + + existing_content.operation_arguments.insert(content.operation_arguments.begin(), content.operation_arguments.end()); + + for (auto subcontent : existing_content.operation_arguments) { + } + + return; + } + } + content_.push_back(std::move(content)); +} + +bool C2Payload::isRaw() const { + return raw_; +} + +void C2Payload::setRawData(const std::string &data) { + raw_data_ = data; +} + +void C2Payload::setRawData(const std::vector<char> &data) { + raw_data_ = std::string(data.data(), data.size()); +} + +std::string C2Payload::getRawData() const { + return raw_data_; +} + +void C2Payload::addPayload(const C2Payload &&payload) { + payloads_.push_back(std::move(payload)); +} +const std::vector<C2Payload> &C2Payload::getNestedPayloads() const { + return payloads_; +} + +C2Payload &C2Payload::operator=(const C2Payload &&other) { + state::Update::operator=(std::move(other)); + isResponse = other.isResponse; + op_ = std::move(other.op_); + raw_ = other.raw_; + if (raw_) { + raw_data_ = std::move(raw_data_); + } + label_ = std::move(other.label_); + payloads_ = std::move(other.payloads_); + content_ = std::move(other.content_); + return *this; +} + +C2Payload &C2Payload::operator=(const C2Payload &other) { + state::Update::operator=(other); + isResponse = other.isResponse; + op_ = other.op_; + raw_ = other.raw_; + if (raw_) { + raw_data_ = other.raw_data_; + } + label_ = other.label_; + payloads_ = other.payloads_; + content_ = other.content_; + return *this; +} + +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/c2/protocols/RESTProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp b/libminifi/src/c2/protocols/RESTProtocol.cpp new file mode 100644 index 0000000..c8babb3 --- /dev/null +++ b/libminifi/src/c2/protocols/RESTProtocol.cpp @@ -0,0 +1,177 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "c2/protocols/RESTProtocol.h" + +#include <algorithm> +#include <memory> +#include <utility> +#include <map> +#include <string> +#include <vector> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const std::vector<char> &response) { + Json::Reader reader; + Json::Value root; + try { + if (reader.parse(std::string(response.data(), response.size()), root)) { + std::string requested_operation = getOperation(payload); + + std::string identifier; + if (root.isMember("operationid")) { + identifier = root["operationid"].asString(); + } + if (root["operation"].asString() == requested_operation) { + if (root["requested_operations"].size() == 0) { + return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true)); + } + C2Payload new_payload(payload.getOperation(), state::UpdateState::NESTED, true); + + new_payload.setIdentifier(identifier); + + for (const Json::Value& request : root["requested_operations"]) { + Operation newOp = stringToOperation(request["operation"].asString()); + C2Payload nested_payload(newOp, state::UpdateState::READ_COMPLETE, true); + C2ContentResponse new_command(newOp); + new_command.delay = 0; + new_command.required = true; + new_command.ttl = -1; + // set the identifier if one exists + if (request.isMember("operationid")) { + new_command.ident = request["operationid"].asString(); + nested_payload.setIdentifier(new_command.ident); + } + new_command.name = request["name"].asString(); + + if (request.isMember("content") && request["content"].size() > 0) { + for (const auto &name : request["content"].getMemberNames()) { + new_command.operation_arguments[name] = request["content"][name].asString(); + } + } + nested_payload.addContent(std::move(new_command)); + new_payload.addPayload(std::move(nested_payload)); + } + // we have a response for this request + return std::move(new_payload); + } + } + } catch (...) { + } + return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true)); +} + +Json::Value RESTProtocol::serializeJsonPayload(Json::Value &json_root, const C2Payload &payload) { + // get the name from the content + Json::Value json_payload; + std::map<std::string, std::vector<Json::Value>> children; + for (const auto &nested_payload : payload.getNestedPayloads()) { + Json::Value child_payload = serializeJsonPayload(json_payload, nested_payload); + children[nested_payload.getLabel()].push_back(child_payload); + } + for (auto child_vector : children) { + if (child_vector.second.size() > 1) { + Json::Value children_json(Json::arrayValue); + for (auto child : child_vector.second) { + json_payload[child_vector.first] = child; + } + } else { + if (child_vector.second.size() == 1) { + if (child_vector.second.at(0).isMember(child_vector.first)) { + json_payload[child_vector.first] = child_vector.second.at(0)[child_vector.first]; + } else { + json_payload[child_vector.first] = child_vector.second.at(0); + } + } + } + } + + const std::vector<C2ContentResponse> &content = payload.getContent(); + for (const auto &payload_content : content) { + Json::Value payload_content_values; + bool use_sub_option = true; + if (payload_content.op == payload.getOperation()) { + for (auto content : payload_content.operation_arguments) { + if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) { + json_payload[payload_content.name] = content.second; + use_sub_option = false; + } else { + payload_content_values[content.first] = content.second; + } + } + } + if (use_sub_option) + json_payload[payload_content.name] = payload_content_values; + } + return json_payload; +} + +std::string RESTProtocol::getOperation(const C2Payload &payload) { + switch (payload.getOperation()) { + case Operation::ACKNOWLEDGE: + return "acknowledge"; + case Operation::HEARTBEAT: + return "heartbeat"; + case Operation::RESTART: + return "restart"; + case Operation::DESCRIBE: + return "describe"; + case Operation::STOP: + return "stop"; + case Operation::START: + return "start"; + case Operation::UPDATE: + return "update"; + default: + return "heartbeat"; + } +} + +Operation RESTProtocol::stringToOperation(const std::string str) { + std::string op = str; + std::transform(str.begin(), str.end(), op.begin(), ::tolower); + if (op == "heartbeat") { + return Operation::HEARTBEAT; + } else if (op == "acknowledge") { + return Operation::ACKNOWLEDGE; + } else if (op == "update") { + return Operation::UPDATE; + } else if (op == "describe") { + return Operation::DESCRIBE; + } else if (op == "restart") { + return Operation::RESTART; + } else if (op == "clear") { + return Operation::CLEAR; + } else if (op == "stop") { + return Operation::STOP; + } else if (op == "start") { + return Operation::START; + } + return Operation::HEARTBEAT; +} + +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/c2/protocols/RESTReceiver.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/protocols/RESTReceiver.cpp b/libminifi/src/c2/protocols/RESTReceiver.cpp new file mode 100644 index 0000000..e79ffd7 --- /dev/null +++ b/libminifi/src/c2/protocols/RESTReceiver.cpp @@ -0,0 +1,148 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "c2/protocols/RESTReceiver.h" +#include <algorithm> +#include <memory> +#include <utility> +#include <map> +#include <string> +#include <vector> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +int log_message(const struct mg_connection *conn, const char *message) { + puts(message); + return 1; +} + +int ssl_protocol_en(void *ssl_context, void *user_data) { + struct ssl_ctx_st *ctx = (struct ssl_ctx_st *) ssl_context; + return 0; +} + +RESTReceiver::RESTReceiver(std::string name, uuid_t uuid) + : HeartBeatReporter(name, uuid), + logger_(logging::LoggerFactory<RESTReceiver>::getLogger()) { +} + +void RESTReceiver::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) { + HeartBeatReporter::initialize(controller, configure); + logger_->log_debug("Initializing rest receiveer"); + if (nullptr != configuration_) { + std::string listeningPort, rootUri, caCert; + configuration_->get("c2.rest.listener.port", listeningPort); + configuration_->get("c2.rest.listener.heartbeat.rooturi", rootUri); + configuration_->get("c2.rest.listener.cacert", caCert); + + if (!listeningPort.empty() && !rootUri.empty()) { + handler = std::unique_ptr<ListeningProtocol>(new ListeningProtocol()); + if (!caCert.empty()) { + listener = std::move(start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get()), caCert)); + } else { + listener = std::move(start_webserver(listeningPort, rootUri, dynamic_cast<CivetHandler*>(handler.get()))); + } + } + } +} +int16_t RESTReceiver::heartbeat(const C2Payload &payload) { + std::string operation_request_str = getOperation(payload); + std::string outputConfig; + Json::Value json_payload; + json_payload["operation"] = operation_request_str; + if (payload.getIdentifier().length() > 0) { + json_payload["operationid"] = payload.getIdentifier(); + } + const std::vector<C2ContentResponse> &content = payload.getContent(); + + for (const auto &payload_content : content) { + Json::Value payload_content_values; + bool use_sub_option = true; + if (payload_content.op == payload.getOperation()) { + for (auto content : payload_content.operation_arguments) { + if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) { + json_payload[payload_content.name] = content.second; + use_sub_option = false; + } else { + payload_content_values[content.first] = content.second; + } + } + } + if (use_sub_option) + json_payload[payload_content.name] = payload_content_values; + } + + for (const auto &nested_payload : payload.getNestedPayloads()) { + json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload); + } + + Json::StyledWriter writer; + outputConfig = writer.write(json_payload); + if (handler != nullptr) { + logger_->log_debug("Setting %s", outputConfig); + handler->setResponse(outputConfig); + } + + return 0; +} + +std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler, std::string &ca_cert) { + struct mg_callbacks callback; + + memset(&callback, 0, sizeof(callback)); + callback.init_ssl = ssl_protocol_en; + std::string my_port = port; + my_port += "s"; + callback.log_message = log_message; + const char *options[] = { "listening_ports", port.c_str(), "ssl_certificate", ca_cert.c_str(), "ssl_protocol_version", "0", "ssl_cipher_list", "ALL", + "ssl_verify_peer", "no", "num_threads", "1", 0 }; + + std::vector<std::string> cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options)); + + server->addHandler(rooturi, handler); + + return server; +} + +std::unique_ptr<CivetServer> RESTReceiver::start_webserver(const std::string &port, std::string &rooturi, CivetHandler *handler) { + const char *options[] = { "document_root", ".", "listening_ports", port.c_str(), "num_threads", "1", 0 }; + + std::vector<std::string> cpp_options; + for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) { + cpp_options.push_back(options[i]); + } + std::unique_ptr<CivetServer> server = std::unique_ptr<CivetServer>(new CivetServer(cpp_options)); + + server->addHandler(rooturi, handler); + + return server; +} + +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/c2/protocols/RESTSender.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/protocols/RESTSender.cpp b/libminifi/src/c2/protocols/RESTSender.cpp new file mode 100644 index 0000000..e15522b --- /dev/null +++ b/libminifi/src/c2/protocols/RESTSender.cpp @@ -0,0 +1,144 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "c2/protocols/RESTSender.h" + +#include <algorithm> +#include <memory> +#include <utility> +#include <map> +#include <string> +#include <vector> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace c2 { + +RESTSender::RESTSender(std::string name, uuid_t uuid) + : C2Protocol(name, uuid), + logger_(logging::LoggerFactory<Connectable>::getLogger()) { +} + +void RESTSender::initialize(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<Configure> &configure) { + C2Protocol::initialize(controller, configure); + // base URL when one is not specified. + if (nullptr != configure) { + configure->get("c2.rest.url", rest_uri_); + configure->get("c2.rest.url.ack", ack_uri_); + } + logger_->log_info("Submitting to %s", rest_uri_); +} +C2Payload RESTSender::consumePayload(const std::string &url, const C2Payload &payload, Direction direction, bool async) { + std::string operation_request_str = getOperation(payload); + std::string outputConfig; + if (direction == Direction::TRANSMIT) { + Json::Value json_payload; + json_payload["operation"] = operation_request_str; + if (payload.getIdentifier().length() > 0) { + json_payload["operationid"] = payload.getIdentifier(); + } + const std::vector<C2ContentResponse> &content = payload.getContent(); + + for (const auto &payload_content : content) { + Json::Value payload_content_values; + bool use_sub_option = true; + if (payload_content.op == payload.getOperation()) { + for (auto content : payload_content.operation_arguments) { + if (payload_content.operation_arguments.size() == 1 && payload_content.name == content.first) { + json_payload[payload_content.name] = content.second; + use_sub_option = false; + } else { + payload_content_values[content.first] = content.second; + } + } + } + if (use_sub_option) + json_payload[payload_content.name] = payload_content_values; + } + + for (const auto &nested_payload : payload.getNestedPayloads()) { + json_payload[nested_payload.getLabel()] = serializeJsonPayload(json_payload, nested_payload); + } + + Json::StyledWriter writer; + outputConfig = writer.write(json_payload); + } + + return std::move(sendPayload(url, direction, payload, outputConfig)); +} + +C2Payload RESTSender::consumePayload(const C2Payload &payload, Direction direction, bool async) { + if (payload.getOperation() == ACKNOWLEDGE) { + return consumePayload(ack_uri_, payload, direction, async); + } + return consumePayload(rest_uri_, payload, direction, async); +} + +void RESTSender::update(const std::shared_ptr<Configure> &configure) { + std::string url; + configure->get("c2.rest.url", url); + configure->get("c2.rest.url.ack", url); +} + +const C2Payload RESTSender::sendPayload(const std::string url, const Direction direction, const C2Payload &payload, const std::string outputConfig) { + if (!url.empty()) { + utils::HTTPClient client(url, ssl_context_service_); + client.setConnectionTimeout(2); + + std::unique_ptr<utils::ByteInputCallBack> input = nullptr; + std::unique_ptr<utils::HTTPUploadCallback> callback = nullptr; + if (direction == Direction::TRANSMIT) { + input = std::unique_ptr<utils::ByteInputCallBack>(new utils::ByteInputCallBack()); + callback = std::unique_ptr<utils::HTTPUploadCallback>(new utils::HTTPUploadCallback); + input->write(outputConfig); + callback->ptr = input.get(); + callback->pos = 0; + client.set_request_method("POST"); + client.setUploadCallback(callback.get()); + } else { + // we do not need to set the uplaod callback + // since we are not uploading anything on a get + client.set_request_method("GET"); + } + client.setContentType("application/json"); + bool isOkay = client.submit(); + int64_t respCode = client.getResponseCode(); + + if (isOkay && respCode) { + if (payload.isRaw()) { + C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true, true); + + response_payload.setRawData(client.getResponseBody()); + return std::move(response_payload); + } + return parseJsonResponse(payload, client.getResponseBody()); + } else { + return C2Payload(payload.getOperation(), state::UpdateState::READ_ERROR, true); + } + } else { + return C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true); + } +} + +} /* namespace c2 */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/controllers/SSLContextService.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/controllers/SSLContextService.cpp b/libminifi/src/controllers/SSLContextService.cpp index 95ccbb0..e8dc520 100644 --- a/libminifi/src/controllers/SSLContextService.cpp +++ b/libminifi/src/controllers/SSLContextService.cpp @@ -35,11 +35,13 @@ void SSLContextService::initialize() { if (initialized_) return; - std::lock_guard < std::mutex > lock(initialization_mutex_); + std::lock_guard<std::mutex> lock(initialization_mutex_); ControllerService::initialize(); initializeTLS(); + + initialized_ = true; } std::unique_ptr<SSLContext> SSLContextService::createSSLContext() { @@ -65,8 +67,7 @@ std::unique_ptr<SSLContext> SSLContextService::createSSLContext() { if (!IsNullOrEmpty(private_key_)) { int retp = SSL_CTX_use_PrivateKey_file(ctx, private_key_.c_str(), SSL_FILETYPE_PEM); if (retp != 1) { - logger_->log_error("Could not create load private key,%i on %s error : %s", retp, private_key_, - std::strerror(errno)); + logger_->log_error("Could not create load private key,%i on %s error : %s", retp, private_key_, std::strerror(errno)); return nullptr; } @@ -80,31 +81,31 @@ std::unique_ptr<SSLContext> SSLContextService::createSSLContext() { if (retp == 0) { logger_->log_error("Can not load CA certificate, Exiting, error : %s", std::strerror(errno)); } - return std::unique_ptr < SSLContext > (new SSLContext(ctx)); + return std::unique_ptr<SSLContext>(new SSLContext(ctx)); } const std::string &SSLContextService::getCertificateFile() { - std::lock_guard < std::mutex > lock(initialization_mutex_); + std::lock_guard<std::mutex> lock(initialization_mutex_); return certificate; } const std::string &SSLContextService::getPassphrase() { - std::lock_guard < std::mutex > lock(initialization_mutex_); + std::lock_guard<std::mutex> lock(initialization_mutex_); return passphrase_; } const std::string &SSLContextService::getPassphraseFile() { - std::lock_guard < std::mutex > lock(initialization_mutex_); + std::lock_guard<std::mutex> lock(initialization_mutex_); return passphrase_file_; } const std::string &SSLContextService::getPrivateKeyFile() { - std::lock_guard < std::mutex > lock(initialization_mutex_); + std::lock_guard<std::mutex> lock(initialization_mutex_); return private_key_; } const std::string &SSLContextService::getCACertificate() { - std::lock_guard < std::mutex > lock(initialization_mutex_); + std::lock_guard<std::mutex> lock(initialization_mutex_); return ca_certificate_; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/ClassLoader.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ClassLoader.cpp b/libminifi/src/core/ClassLoader.cpp index fbd46f6..9bead0e 100644 --- a/libminifi/src/core/ClassLoader.cpp +++ b/libminifi/src/core/ClassLoader.cpp @@ -43,7 +43,7 @@ uint16_t ClassLoader::registerResource(const std::string &resource) { logger_->log_error("Cannot load library: %s", dlerror()); return RESOURCE_FAILURE; } else { - std::lock_guard < std::mutex > lock(internal_mutex_); + std::lock_guard<std::mutex> lock(internal_mutex_); dl_handles_.push_back(resource_ptr); } @@ -60,9 +60,9 @@ uint16_t ClassLoader::registerResource(const std::string &resource) { ObjectFactory *factory = create_factory_func(); - std::lock_guard < std::mutex > lock(internal_mutex_); + std::lock_guard<std::mutex> lock(internal_mutex_); - loaded_factories_[factory->getClassName()] = std::unique_ptr < ObjectFactory > (factory); + loaded_factories_[factory->getClassName()] = std::unique_ptr<ObjectFactory>(factory); return RESOURCE_SUCCESS; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/ConfigurableComponent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp index 62a08db..b9a3a79 100644 --- a/libminifi/src/core/ConfigurableComponent.cpp +++ b/libminifi/src/core/ConfigurableComponent.cpp @@ -43,7 +43,7 @@ ConfigurableComponent::~ConfigurableComponent() { } bool ConfigurableComponent::getProperty(const std::string &name, Property &prop) { - std::lock_guard < std::mutex > lock(configuration_mutex_); + std::lock_guard<std::mutex> lock(configuration_mutex_); auto &&it = properties_.find(name); @@ -62,13 +62,13 @@ bool ConfigurableComponent::getProperty(const std::string &name, Property &prop) * @return result of getting property. */ bool ConfigurableComponent::getProperty(const std::string name, std::string &value) { - std::lock_guard < std::mutex > lock(configuration_mutex_); + std::lock_guard<std::mutex> lock(configuration_mutex_); auto &&it = properties_.find(name); if (it != properties_.end()) { Property item = it->second; value = item.getValue(); - logger_->log_info("Processor %s property name %s value %s", name, item.getName(), value); + logger_->log_info("Component %s property name %s value %s", name, item.getName(), value); return true; } else { return false; @@ -81,7 +81,7 @@ bool ConfigurableComponent::getProperty(const std::string name, std::string &val * @return result of setting property. */ bool ConfigurableComponent::setProperty(const std::string name, std::string value) { - std::lock_guard < std::mutex > lock(configuration_mutex_); + std::lock_guard<std::mutex> lock(configuration_mutex_); auto &&it = properties_.find(name); if (it != properties_.end()) { @@ -102,7 +102,7 @@ bool ConfigurableComponent::setProperty(const std::string name, std::string valu * @return result of setting property. */ bool ConfigurableComponent::updateProperty(const std::string &name, const std::string &value) { - std::lock_guard < std::mutex > lock(configuration_mutex_); + std::lock_guard<std::mutex> lock(configuration_mutex_); auto &&it = properties_.find(name); if (it != properties_.end()) { @@ -123,7 +123,7 @@ bool ConfigurableComponent::updateProperty(const std::string &name, const std::s * @return whether property was set or not */ bool ConfigurableComponent::setProperty(Property &prop, std::string value) { - std::lock_guard < std::mutex > lock(configuration_mutex_); + std::lock_guard<std::mutex> lock(configuration_mutex_); auto it = properties_.find(prop.getName()); if (it != properties_.end()) { @@ -151,7 +151,7 @@ bool ConfigurableComponent::setSupportedProperties(std::set<Property> properties return false; } - std::lock_guard < std::mutex > lock(configuration_mutex_); + std::lock_guard<std::mutex> lock(configuration_mutex_); properties_.clear(); for (auto item : properties) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/ConfigurationFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ConfigurationFactory.cpp b/libminifi/src/core/ConfigurationFactory.cpp index 0a0e911..1640380 100644 --- a/libminifi/src/core/ConfigurationFactory.cpp +++ b/libminifi/src/core/ConfigurationFactory.cpp @@ -49,22 +49,22 @@ std::unique_ptr<core::FlowConfiguration> createFlowConfiguration(std::shared_ptr if (class_name_lc == "flowconfiguration") { // load the base configuration. - return std::unique_ptr < core::FlowConfiguration > (new core::FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configure, path)); + return std::unique_ptr<core::FlowConfiguration>(new core::FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configure, path)); } else if (class_name_lc == "yamlconfiguration") { // only load if the class is defined. - return std::unique_ptr < core::FlowConfiguration > (instantiate<core::YamlConfiguration>(repo, flow_file_repo, content_repo, stream_factory, configure, path)); + return std::unique_ptr<core::FlowConfiguration>(instantiate<core::YamlConfiguration>(repo, flow_file_repo, content_repo, stream_factory, configure, path)); } else { if (fail_safe) { - return std::unique_ptr < core::FlowConfiguration > (new core::FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configure, path)); + return std::unique_ptr<core::FlowConfiguration>(new core::FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configure, path)); } else { throw std::runtime_error("Support for the provided configuration class could not be found"); } } } catch (const std::runtime_error &r) { if (fail_safe) { - return std::unique_ptr < core::FlowConfiguration > (new core::FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configure, path)); + return std::unique_ptr<core::FlowConfiguration>(new core::FlowConfiguration(repo, flow_file_repo, content_repo, stream_factory, configure, path)); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/Connectable.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Connectable.cpp b/libminifi/src/core/Connectable.cpp index 9c3b26a..cf01f0c 100644 --- a/libminifi/src/core/Connectable.cpp +++ b/libminifi/src/core/Connectable.cpp @@ -53,7 +53,7 @@ bool Connectable::setSupportedRelationships(std::set<core::Relationship> relatio return false; } - std::lock_guard < std::mutex > lock(relationship_mutex_); + std::lock_guard<std::mutex> lock(relationship_mutex_); relationships_.clear(); for (auto item : relationships) { @@ -67,7 +67,7 @@ bool Connectable::setSupportedRelationships(std::set<core::Relationship> relatio bool Connectable::isSupportedRelationship(core::Relationship relationship) { const bool requiresLock = isRunning(); - const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock < std::mutex > (relationship_mutex_); + const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock<std::mutex>(relationship_mutex_); const auto &it = relationships_.find(relationship.getName()); if (it != relationships_.end()) { @@ -83,7 +83,7 @@ bool Connectable::setAutoTerminatedRelationships(std::set<Relationship> relation return false; } - std::lock_guard < std::mutex > lock(relationship_mutex_); + std::lock_guard<std::mutex> lock(relationship_mutex_); auto_terminated_relationships_.clear(); for (auto item : relationships) { @@ -97,7 +97,7 @@ bool Connectable::setAutoTerminatedRelationships(std::set<Relationship> relation bool Connectable::isAutoTerminated(core::Relationship relationship) { const bool requiresLock = isRunning(); - const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock < std::mutex > (relationship_mutex_); + const auto conditionalLock = !requiresLock ? std::unique_lock<std::mutex>() : std::unique_lock<std::mutex>(relationship_mutex_); const auto &it = auto_terminated_relationships_.find(relationship.getName()); if (it != auto_terminated_relationships_.end()) { @@ -111,7 +111,7 @@ void Connectable::waitForWork(uint64_t timeoutMs) { has_work_.store(isWorkAvailable()); if (!has_work_.load()) { - std::unique_lock < std::mutex > lock(work_available_mutex_); + std::unique_lock<std::mutex> lock(work_available_mutex_); work_condition_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [&] {return has_work_.load();}); } } @@ -143,7 +143,7 @@ std::set<std::shared_ptr<Connectable>> Connectable::getOutGoingConnections(std:: } std::shared_ptr<Connectable> Connectable::getNextIncomingConnection() { - std::lock_guard < std::mutex > lock(relationship_mutex_); + std::lock_guard<std::mutex> lock(relationship_mutex_); if (_incomingConnections.size() == 0) return NULL; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/FlowConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index e8e7462..9ce7146 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -35,11 +35,12 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string if (nullptr == ptr) { logger_->log_error("No Processor defined for %s", name.c_str()); } - std::shared_ptr<core::Processor> processor = std::static_pointer_cast < core::Processor > (ptr); + std::shared_ptr<core::Processor> processor = std::static_pointer_cast<core::Processor>(ptr); // initialize the processor processor->initialize(); + processor->setStreamFactory(stream_factory_); return processor; } @@ -54,15 +55,15 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProvenanceReportTask() } std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(std::string name, uuid_t uuid, int version) { - return std::unique_ptr < core::ProcessGroup > (new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid, version)); + return std::unique_ptr<core::ProcessGroup>(new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid, version)); } std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup(std::string name, uuid_t uuid) { - return std::unique_ptr < core::ProcessGroup > (new core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid)); + return std::unique_ptr<core::ProcessGroup>(new core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid)); } std::shared_ptr<minifi::Connection> FlowConfiguration::createConnection(std::string name, uuid_t uuid) { - return std::make_shared < minifi::Connection > (flow_file_repo_, content_repo_, name, uuid); + return std::make_shared<minifi::Connection>(flow_file_repo_, content_repo_, name, uuid); } std::shared_ptr<core::controller::ControllerServiceNode> FlowConfiguration::createControllerService(const std::string &class_name, const std::string &name, uuid_t uuid) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/FlowFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp index 6afd0fe..d08ea4b 100644 --- a/libminifi/src/core/FlowFile.cpp +++ b/libminifi/src/core/FlowFile.cpp @@ -45,6 +45,7 @@ FlowFile::FlowFile() connection_(nullptr), original_connection_() { entry_date_ = getTimeMillis(); + event_time_ = entry_date_; lineage_start_date_ = entry_date_; char uuidStr[37] = { 0 }; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/ProcessGroup.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index db0fe08..a537f1a 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -69,12 +69,12 @@ ProcessGroup::~ProcessGroup() { } bool ProcessGroup::isRootProcessGroup() { - std::lock_guard < std::recursive_mutex > lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); return (type_ == ROOT_PROCESS_GROUP); } void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) { - std::lock_guard < std::recursive_mutex > lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); if (processors_.find(processor) == processors_.end()) { // We do not have the same processor in this process group yet @@ -84,7 +84,7 @@ void ProcessGroup::addProcessor(std::shared_ptr<Processor> processor) { } void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) { - std::lock_guard < std::recursive_mutex > lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); if (processors_.find(processor) != processors_.end()) { // We do have the same processor in this process group yet @@ -94,7 +94,7 @@ void ProcessGroup::removeProcessor(std::shared_ptr<Processor> processor) { } void ProcessGroup::addProcessGroup(ProcessGroup *child) { - std::lock_guard < std::recursive_mutex > lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); if (child_process_groups_.find(child) == child_process_groups_.end()) { // We do not have the same child process group in this process group yet @@ -104,7 +104,7 @@ void ProcessGroup::addProcessGroup(ProcessGroup *child) { } void ProcessGroup::removeProcessGroup(ProcessGroup *child) { - std::lock_guard < std::recursive_mutex > lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); if (child_process_groups_.find(child) != child_process_groups_.end()) { // We do have the same child process group in this process group yet @@ -114,7 +114,7 @@ void ProcessGroup::removeProcessGroup(ProcessGroup *child) { } void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) { - std::lock_guard < std::recursive_mutex > lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); try { // Start all the processor node, input and output ports @@ -142,7 +142,7 @@ void ProcessGroup::startProcessing(TimerDrivenSchedulingAgent *timeScheduler, Ev } void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, EventDrivenSchedulingAgent *eventScheduler) { - std::lock_guard < std::recursive_mutex > lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); try { // Stop all the processor node, input and output ports @@ -168,7 +168,7 @@ void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, Eve } std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) { - std::lock_guard < std::recursive_mutex > lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); std::shared_ptr<Processor> ret = NULL; for (auto processor : processors_) { logger_->log_info("find processor %s", processor->getName().c_str()); @@ -207,8 +207,21 @@ std::shared_ptr<core::controller::ControllerServiceNode> ProcessGroup::findContr return controller_service_map_.getControllerServiceNode(nodeId); } +void ProcessGroup::getAllProcessors(std::vector<std::shared_ptr<Processor>> &processor_vec) { + std::lock_guard<std::recursive_mutex> lock(mutex_); + std::shared_ptr<Processor> ret = NULL; + + for (auto processor : processors_) { + logger_->log_debug("Current processor is %s", processor->getName().c_str()); + processor_vec.push_back(processor); + } + for (auto processGroup : child_process_groups_) { + processGroup->getAllProcessors(processor_vec); + } +} + std::shared_ptr<Processor> ProcessGroup::findProcessor(const std::string &processorName) { - std::lock_guard < std::recursive_mutex > lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); std::shared_ptr<Processor> ret = NULL; for (auto processor : processors_) { logger_->log_debug("Current processor is %s", processor->getName().c_str()); @@ -224,7 +237,7 @@ std::shared_ptr<Processor> ProcessGroup::findProcessor(const std::string &proces } void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) { - std::lock_guard < std::recursive_mutex > lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); for (auto processor : processors_) { if (processor->getName() == processorName) { processor->setProperty(propertyName, propertyValue); @@ -239,6 +252,7 @@ void ProcessGroup::updatePropertyValue(std::string processorName, std::string pr void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connection>> &connectionMap) { for (auto connection : connections_) { connectionMap[connection->getUUIDStr()] = connection; + connectionMap[connection->getName()] = connection; } for (auto processGroup : child_process_groups_) { processGroup->getConnections(connectionMap); @@ -246,7 +260,7 @@ void ProcessGroup::getConnections(std::map<std::string, std::shared_ptr<Connecti } void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) { - std::lock_guard < std::recursive_mutex > lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); if (connections_.find(connection) == connections_.end()) { // We do not have the same connection in this process group yet @@ -268,7 +282,7 @@ void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) { } void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) { - std::lock_guard < std::recursive_mutex > lock(mutex_); + std::lock_guard<std::recursive_mutex> lock(mutex_); if (connections_.find(connection) != connections_.end()) { // We do not have the same connection in this process group yet http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/ProcessSession.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index b3035cb..e3799e1 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -44,7 +44,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::create() { _addedFlowFiles[record->getUUIDStr()] = record; logger_->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str()); std::stringstream details; - details << process_context_->getProcessorNode().getName() << " creates flow record " << record->getUUIDStr(); + details << process_context_->getProcessorNode()->getName() << " creates flow record " << record->getUUIDStr(); provenance_report_->create(record, details.str()); return record; @@ -160,52 +160,54 @@ std::shared_ptr<core::FlowFile> ProcessSession::clone(std::shared_ptr<core::Flow void ProcessSession::remove(std::shared_ptr<core::FlowFile> &flow) { flow->setDeleted(true); + process_context_->getFlowFileRepository()->Delete(flow->getUUIDStr()); _deletedFlowFiles[flow->getUUIDStr()] = flow; - std::string reason = process_context_->getProcessorNode().getName() + " drop flow record " + flow->getUUIDStr(); + std::string reason = process_context_->getProcessorNode()->getName() + " drop flow record " + flow->getUUIDStr(); provenance_report_->drop(flow, reason); } void ProcessSession::remove(std::shared_ptr<core::FlowFile> &&flow) { flow->setDeleted(true); + process_context_->getFlowFileRepository()->Delete(flow->getUUIDStr()); _deletedFlowFiles[flow->getUUIDStr()] = flow; - std::string reason = process_context_->getProcessorNode().getName() + " drop flow record " + flow->getUUIDStr(); + std::string reason = process_context_->getProcessorNode()->getName() + " drop flow record " + flow->getUUIDStr(); provenance_report_->drop(flow, reason); } void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key, std::string value) { flow->setAttribute(key, value); std::stringstream details; - details << process_context_->getProcessorNode().getName() << " modify flow record " << flow->getUUIDStr() << " attribute " << key << ":" << value; + details << process_context_->getProcessorNode()->getName() << " modify flow record " << flow->getUUIDStr() << " attribute " << key << ":" << value; provenance_report_->modifyAttributes(flow, details.str()); } void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &flow, std::string key) { flow->removeAttribute(key); std::stringstream details; - details << process_context_->getProcessorNode().getName() << " remove flow record " << flow->getUUIDStr() << " attribute " + key; + details << process_context_->getProcessorNode()->getName() << " remove flow record " << flow->getUUIDStr() << " attribute " + key; provenance_report_->modifyAttributes(flow, details.str()); } void ProcessSession::putAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key, std::string value) { flow->setAttribute(key, value); std::stringstream details; - details << process_context_->getProcessorNode().getName() << " modify flow record " << flow->getUUIDStr() << " attribute " << key << ":" << value; + details << process_context_->getProcessorNode()->getName() << " modify flow record " << flow->getUUIDStr() << " attribute " << key << ":" << value; provenance_report_->modifyAttributes(flow, details.str()); } void ProcessSession::removeAttribute(std::shared_ptr<core::FlowFile> &&flow, std::string key) { flow->removeAttribute(key); std::stringstream details; - details << process_context_->getProcessorNode().getName() << " remove flow record " << flow->getUUIDStr() << " attribute " << key; + details << process_context_->getProcessorNode()->getName() << " remove flow record " << flow->getUUIDStr() << " attribute " << key; provenance_report_->modifyAttributes(flow, details.str()); } void ProcessSession::penalize(std::shared_ptr<core::FlowFile> &flow) { - flow->setPenaltyExpiration(getTimeMillis() + process_context_->getProcessorNode().getPenalizationPeriodMsec()); + flow->setPenaltyExpiration(getTimeMillis() + process_context_->getProcessorNode()->getPenalizationPeriodMsec()); } void ProcessSession::penalize(std::shared_ptr<core::FlowFile> &&flow) { - flow->setPenaltyExpiration(getTimeMillis() + process_context_->getProcessorNode().getPenalizationPeriodMsec()); + flow->setPenaltyExpiration(getTimeMillis() + process_context_->getProcessorNode()->getPenalizationPeriodMsec()); } void ProcessSession::transfer(std::shared_ptr<core::FlowFile> &flow, Relationship relationship) { @@ -222,7 +224,6 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow, OutputStreamCa try { uint64_t startTime = getTimeMillis(); claim->increaseFlowFileRecordOwnedCount(); -// fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim); // Call the callback to write the content if (nullptr == stream) { @@ -244,12 +245,9 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &flow, OutputStreamCa } flow->setResourceClaim(claim); - /* - logger_->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s", - flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ stream->closeStream(); std::stringstream details; - details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr(); + details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr(); uint64_t endTime = getTimeMillis(); provenance_report_->modifyContent(flow, details.str(), endTime - startTime); } catch (std::exception &exception) { @@ -295,7 +293,7 @@ void ProcessSession::write(std::shared_ptr<core::FlowFile> &&flow, OutputStreamC flow->setResourceClaim(claim); std::stringstream details; - details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr(); + details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr(); uint64_t endTime = getTimeMillis(); provenance_report_->modifyContent(flow, details.str(), endTime - startTime); } catch (std::exception &exception) { @@ -342,7 +340,7 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &&flow, OutputStream uint64_t appendSize = stream->getSize() - oldPos; flow->setSize(flow->getSize() + appendSize); std::stringstream details; - details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr(); + details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr(); uint64_t endTime = getTimeMillis(); provenance_report_->modifyContent(flow, details.str(), endTime - startTime); } catch (std::exception &exception) { @@ -382,7 +380,7 @@ void ProcessSession::append(std::shared_ptr<core::FlowFile> &flow, OutputStreamC flow->setSize(flow->getSize() + appendSize); std::stringstream details; - details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr(); + details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr(); uint64_t endTime = getTimeMillis(); provenance_report_->modifyContent(flow, details.str(), endTime - startTime); } catch (std::exception &exception) { @@ -440,12 +438,14 @@ void ProcessSession::read(std::shared_ptr<core::FlowFile> &&flow, InputStreamCal std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->read(claim); if (nullptr == stream) { + logger_->log_info("claim does not exist"); rollback(); return; } stream->seek(flow->getOffset()); if (callback->process(stream) < 0) { + logger_->log_info("no data written from stream"); rollback(); return; } @@ -511,7 +511,7 @@ void ProcessSession::importFrom(io::DataStream &stream, std::shared_ptr<core::Fl content_stream->closeStream(); std::stringstream details; - details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr(); + details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr(); uint64_t endTime = getTimeMillis(); provenance_report_->modifyContent(flow, details.str(), endTime - startTime); } catch (std::exception &exception) { @@ -585,7 +585,7 @@ void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> if (!keepSource) std::remove(source.c_str()); std::stringstream details; - details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr(); + details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr(); uint64_t endTime = getTimeMillis(); provenance_report_->modifyContent(flow, details.str(), endTime - startTime); } else { @@ -617,7 +617,7 @@ void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> } } -void ProcessSession::import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> flows, bool keepSource, uint64_t offset, char inputDelimiter) { +void ProcessSession::import(std::string source, std::vector<std::shared_ptr<FlowFileRecord>> &flows, bool keepSource, uint64_t offset, char inputDelimiter) { std::shared_ptr<ResourceClaim> claim; std::shared_ptr<FlowFileRecord> flowFile; @@ -640,6 +640,7 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow uint64_t startTime = getTimeMillis(); input.getline(buf, size, inputDelimiter); + size_t bufsize = strlen(buf); std::shared_ptr<io::BaseStream> stream = process_context_->getContentRepository()->write(claim); if (nullptr == stream) { logger_->log_debug("Stream is null"); @@ -648,7 +649,7 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow } if (input) { - if (stream->write(reinterpret_cast<uint8_t*>(buf), size) < 0) { + if (stream->write(reinterpret_cast<uint8_t*>(buf), bufsize) < 0) { invalidWrite = true; break; } @@ -670,13 +671,11 @@ void ProcessSession::import(std::string source, std::vector<std::shared_ptr<Flow flowFile->setResourceClaim(claim); claim->increaseFlowFileRecordOwnedCount(); - logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flowFile->getOffset(), - flowFile->getSize(), - flowFile->getResourceClaim()->getContentFullPath().c_str(), + logger_->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", flowFile->getOffset(), flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath().c_str(), flowFile->getUUIDStr().c_str()); stream->closeStream(); - std::string details = process_context_->getProcessorNode().getName() + " modify flow record content " + flowFile->getUUIDStr(); + std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr(); uint64_t endTime = getTimeMillis(); provenance_report_->modifyContent(flowFile, details, endTime - startTime); flows.push_back(flowFile); @@ -768,7 +767,7 @@ void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> if (!keepSource) std::remove(source.c_str()); std::stringstream details; - details << process_context_->getProcessorNode().getName() << " modify flow record content " << flow->getUUIDStr(); + details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr(); uint64_t endTime = getTimeMillis(); provenance_report_->modifyContent(flow, details.str(), endTime - startTime); } else { @@ -811,10 +810,10 @@ void ProcessSession::commit() { if (itRelationship != _transferRelationship.end()) { Relationship relationship = itRelationship->second; // Find the relationship, we need to find the connections for that relationship - std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode().getOutGoingConnections(relationship.getName()); + std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode()->getOutGoingConnections(relationship.getName()); if (connections.empty()) { // No connection - if (!process_context_->getProcessorNode().isAutoTerminated(relationship)) { + if (!process_context_->getProcessorNode()->isAutoTerminated(relationship)) { // Not autoterminate, we should have the connect std::string message = "Connect empty for non auto terminated relationship" + relationship.getName(); throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str()); @@ -855,10 +854,10 @@ void ProcessSession::commit() { if (itRelationship != _transferRelationship.end()) { Relationship relationship = itRelationship->second; // Find the relationship, we need to find the connections for that relationship - std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode().getOutGoingConnections(relationship.getName()); + std::set<std::shared_ptr<Connectable>> connections = process_context_->getProcessorNode()->getOutGoingConnections(relationship.getName()); if (connections.empty()) { // No connection - if (!process_context_->getProcessorNode().isAutoTerminated(relationship)) { + if (!process_context_->getProcessorNode()->isAutoTerminated(relationship)) { // Not autoterminate, we should have the connect std::string message = "Connect empty for non auto terminated relationship " + relationship.getName(); throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str()); @@ -931,7 +930,7 @@ void ProcessSession::commit() { _originalFlowFiles.clear(); // persistent the provenance report this->provenance_report_->commit(); - logger_->log_trace("ProcessSession committed for %s", process_context_->getProcessorNode().getName().c_str()); + logger_->log_trace("ProcessSession committed for %s", process_context_->getProcessorNode()->getName().c_str()); } catch (std::exception &exception) { logger_->log_debug("Caught Exception %s", exception.what()); throw; @@ -960,7 +959,7 @@ void ProcessSession::rollback() { _addedFlowFiles.clear(); _updatedFlowFiles.clear(); _deletedFlowFiles.clear(); - logger_->log_debug("ProcessSession rollback for %s", process_context_->getProcessorNode().getName().c_str()); + logger_->log_debug("ProcessSession rollback for %s", process_context_->getProcessorNode()->getName().c_str()); } catch (std::exception &exception) { logger_->log_debug("Caught Exception %s", exception.what()); throw; @@ -971,10 +970,10 @@ void ProcessSession::rollback() { } std::shared_ptr<core::FlowFile> ProcessSession::get() { - std::shared_ptr<Connectable> first = process_context_->getProcessorNode().getNextIncomingConnection(); + std::shared_ptr<Connectable> first = process_context_->getProcessorNode()->getNextIncomingConnection(); if (first == NULL) { - logger_->log_debug("Get is null for %s", process_context_->getProcessorNode().getName()); + logger_->log_debug("Get is null for %s", process_context_->getProcessorNode()->getName()); return NULL; } @@ -988,7 +987,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() { for (std::set<std::shared_ptr<core::FlowFile> >::iterator it = expired.begin(); it != expired.end(); ++it) { std::shared_ptr<core::FlowFile> record = *it; std::stringstream details; - details << process_context_->getProcessorNode().getName() << " expire flow record " << record->getUUIDStr(); + details << process_context_->getProcessorNode()->getName() << " expire flow record " << record->getUUIDStr(); provenance_report_->expire(record, details.str()); } } @@ -1004,7 +1003,7 @@ std::shared_ptr<core::FlowFile> ProcessSession::get() { _originalFlowFiles[snapshot->getUUIDStr()] = snapshot; return ret; } - current = std::static_pointer_cast<Connection>(process_context_->getProcessorNode().getNextIncomingConnection()); + current = std::static_pointer_cast<Connection>(process_context_->getProcessorNode()->getNextIncomingConnection()); } while (current != NULL && current != first); return NULL; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/ProcessSessionFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessSessionFactory.cpp b/libminifi/src/core/ProcessSessionFactory.cpp index 570d895..4a6f21a 100644 --- a/libminifi/src/core/ProcessSessionFactory.cpp +++ b/libminifi/src/core/ProcessSessionFactory.cpp @@ -27,8 +27,8 @@ namespace nifi { namespace minifi { namespace core { -std::unique_ptr<ProcessSession> ProcessSessionFactory::createSession() { - return std::unique_ptr < ProcessSession > (new ProcessSession(process_context_)); +std::shared_ptr<ProcessSession> ProcessSessionFactory::createSession() { + return std::make_shared<ProcessSession>(process_context_); } } /* namespace core */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/Processor.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index 0c2e7cf..d35f283 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -71,6 +71,9 @@ bool Processor::isRunning() { void Processor::setScheduledState(ScheduledState state) { state_ = state; + if (state == STOPPED) { + notifyStop(); + } } bool Processor::addConnection(std::shared_ptr<Connectable> conn) { @@ -80,8 +83,8 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) { logger_->log_info("Can not add connection while the process %s is running", name_.c_str()); return false; } - std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection > (conn); - std::lock_guard < std::mutex > lock(mutex_); + std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); + std::lock_guard<std::mutex> lock(mutex_); uuid_t srcUUID; uuid_t destUUID; @@ -141,12 +144,12 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) { return; } - std::lock_guard < std::mutex > lock(mutex_); + std::lock_guard<std::mutex> lock(mutex_); uuid_t srcUUID; uuid_t destUUID; - std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection > (conn); + std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); connection->getSourceUUID(srcUUID); connection->getDestinationUUID(destUUID); @@ -178,13 +181,13 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) { } bool Processor::flowFilesQueued() { - std::lock_guard < std::mutex > lock(mutex_); + std::lock_guard<std::mutex> lock(mutex_); if (_incomingConnections.size() == 0) return false; for (auto &&conn : _incomingConnections) { - std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection > (conn); + std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); if (connection->getQueueSize() > 0) return true; } @@ -193,13 +196,13 @@ bool Processor::flowFilesQueued() { } bool Processor::flowFilesOutGoingFull() { - std::lock_guard < std::mutex > lock(mutex_); + std::lock_guard<std::mutex> lock(mutex_); for (auto &&connection : out_going_connections_) { // We already has connection for this relationship std::set<std::shared_ptr<Connectable>> existedConnection = connection.second; for (const auto conn : existedConnection) { - std::shared_ptr < Connection > connection = std::static_pointer_cast < Connection > (conn); + std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); if (connection->isFull()) return true; } @@ -226,13 +229,31 @@ void Processor::onTrigger(ProcessContext *context, ProcessSessionFactory *sessio } } +void Processor::onTrigger(std::shared_ptr<ProcessContext> context, std::shared_ptr<ProcessSessionFactory> sessionFactory) { + auto session = sessionFactory->createSession(); + + try { + // Call the virtual trigger function + onTrigger(context, session); + session->commit(); + } catch (std::exception &exception) { + logger_->log_debug("Caught Exception %s", exception.what()); + session->rollback(); + throw; + } catch (...) { + logger_->log_debug("Caught Exception Processor::onTrigger"); + session->rollback(); + throw; + } +} + bool Processor::isWorkAvailable() { // We have work if any incoming connection has work bool hasWork = false; try { for (const auto &conn : _incomingConnections) { - std::shared_ptr<Connection> connection = std::static_pointer_cast < Connection > (conn); + std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); if (connection->getQueueSize() > 0) { hasWork = true; break; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/ProcessorNode.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessorNode.cpp b/libminifi/src/core/ProcessorNode.cpp index 05f31a0..0a21f4d 100644 --- a/libminifi/src/core/ProcessorNode.cpp +++ b/libminifi/src/core/ProcessorNode.cpp @@ -17,13 +17,14 @@ #include "core/ProcessorNode.h" #include <memory> +#include <utility> namespace org { namespace apache { namespace nifi { namespace minifi { namespace core { -ProcessorNode::ProcessorNode(const std::shared_ptr<Connectable> processor) +ProcessorNode::ProcessorNode(const std::shared_ptr<Connectable> &processor) : processor_(processor), Connectable(processor->getName(), 0), ConfigurableComponent() { @@ -40,6 +41,11 @@ ProcessorNode::ProcessorNode(const ProcessorNode &other) setUUID(copy); } +ProcessorNode::ProcessorNode(const ProcessorNode &&other) + : Connectable(std::move(other)), + processor_(std::move(other.processor_)) { +} + ProcessorNode::~ProcessorNode() { } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/Repository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Repository.cpp b/libminifi/src/core/Repository.cpp index cf26a0d..be21b16 100644 --- a/libminifi/src/core/Repository.cpp +++ b/libminifi/src/core/Repository.cpp @@ -54,10 +54,13 @@ void Repository::stop() { } // repoSize -uint64_t Repository::repoSize() { +uint64_t Repository::getRepoSize() { return repo_size_; } +void Repository::flush() { +} + } /* namespace core */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/RepositoryFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/RepositoryFactory.cpp b/libminifi/src/core/RepositoryFactory.cpp index 9e99718..b25e87c 100644 --- a/libminifi/src/core/RepositoryFactory.cpp +++ b/libminifi/src/core/RepositoryFactory.cpp @@ -67,13 +67,13 @@ std::shared_ptr<core::Repository> createRepository(const std::string configurati return return_obj; } if (fail_safe) { - return std::make_shared < core::Repository > ("fail_safe", "fail_safe", 1, 1, 1); + return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, 1); } else { throw std::runtime_error("Support for the provided configuration class could not be found"); } } catch (const std::runtime_error &r) { if (fail_safe) { - return std::make_shared < core::Repository > ("fail_safe", "fail_safe", 1, 1, 1); + return std::make_shared<core::Repository>("fail_safe", "fail_safe", 1, 1, 1); } } @@ -96,13 +96,13 @@ std::shared_ptr<core::ContentRepository> createContentRepository(const std::stri return return_obj; } if (fail_safe) { - return std::make_shared < core::repository::FileSystemRepository > ("fail_safe"); + return std::make_shared<core::repository::FileSystemRepository>("fail_safe"); } else { throw std::runtime_error("Support for the provided configuration class could not be found"); } } catch (const std::runtime_error &r) { if (fail_safe) { - return std::make_shared < core::repository::FileSystemRepository > ("fail_safe"); + return std::make_shared<core::repository::FileSystemRepository>("fail_safe"); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/controller/StandardControllerServiceNode.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/controller/StandardControllerServiceNode.cpp b/libminifi/src/core/controller/StandardControllerServiceNode.cpp index 69004c1..5c4aa70 100644 --- a/libminifi/src/core/controller/StandardControllerServiceNode.cpp +++ b/libminifi/src/core/controller/StandardControllerServiceNode.cpp @@ -27,12 +27,12 @@ namespace minifi { namespace core { namespace controller { std::shared_ptr<core::ProcessGroup> &StandardControllerServiceNode::getProcessGroup() { - std::lock_guard < std::mutex > lock(mutex_); + std::lock_guard<std::mutex> lock(mutex_); return process_group_; } void StandardControllerServiceNode::setProcessGroup(std::shared_ptr<ProcessGroup> &processGroup) { - std::lock_guard < std::mutex > lock(mutex_); + std::lock_guard<std::mutex> lock(mutex_); process_group_ = processGroup; } @@ -45,7 +45,7 @@ bool StandardControllerServiceNode::enable() { for (auto linked_service : property.getValues()) { std::shared_ptr<ControllerServiceNode> csNode = provider->getControllerServiceNode(linked_service); if (nullptr != csNode) { - std::lock_guard < std::mutex > lock(mutex_); + std::lock_guard<std::mutex> lock(mutex_); linked_controller_services_.push_back(csNode); } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/logging/LoggerConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp b/libminifi/src/core/logging/LoggerConfiguration.cpp index 4b97055..c06239b 100644 --- a/libminifi/src/core/logging/LoggerConfiguration.cpp +++ b/libminifi/src/core/logging/LoggerConfiguration.cpp @@ -56,19 +56,19 @@ std::vector<std::string> LoggerProperties::get_keys_of_type(const std::string &t LoggerConfiguration::LoggerConfiguration() : root_namespace_(create_default_root()), loggers(std::vector<std::shared_ptr<LoggerImpl>>()), - formatter_(std::make_shared < spdlog::pattern_formatter > (spdlog_default_pattern)) { - logger_ = std::shared_ptr < LoggerImpl > (new LoggerImpl(core::getClassName<LoggerConfiguration>(), get_logger(nullptr, root_namespace_, core::getClassName<LoggerConfiguration>(), formatter_))); + formatter_(std::make_shared<spdlog::pattern_formatter>(spdlog_default_pattern)) { + logger_ = std::shared_ptr<LoggerImpl>(new LoggerImpl(core::getClassName<LoggerConfiguration>(), get_logger(nullptr, root_namespace_, core::getClassName<LoggerConfiguration>(), formatter_))); loggers.push_back(logger_); } void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &logger_properties) { - std::lock_guard < std::mutex > lock(mutex); + std::lock_guard<std::mutex> lock(mutex); root_namespace_ = initialize_namespaces(logger_properties); std::string spdlog_pattern; if (!logger_properties->get("spdlog.pattern", spdlog_pattern)) { spdlog_pattern = spdlog_default_pattern; } - formatter_ = std::make_shared < spdlog::pattern_formatter > (spdlog_pattern); + formatter_ = std::make_shared<spdlog::pattern_formatter>(spdlog_pattern); std::map<std::string, std::shared_ptr<spdlog::logger>> spdloggers; for (auto const & logger_impl : loggers) { std::shared_ptr<spdlog::logger> spdlogger; @@ -85,8 +85,8 @@ void LoggerConfiguration::initialize(const std::shared_ptr<LoggerProperties> &lo } std::shared_ptr<Logger> LoggerConfiguration::getLogger(const std::string &name) { - std::lock_guard < std::mutex > lock(mutex); - std::shared_ptr<LoggerImpl> result = std::make_shared < LoggerImpl > (name, get_logger(logger_, root_namespace_, name, formatter_)); + std::lock_guard<std::mutex> lock(mutex); + std::shared_ptr<LoggerImpl> result = std::make_shared<LoggerImpl>(name, get_logger(logger_, root_namespace_, name, formatter_)); loggers.push_back(result); return result; } @@ -130,7 +130,7 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::initialize_names } catch (const std::out_of_range &oor) { } } - sink_map[appender_name] = std::make_shared < spdlog::sinks::rotating_file_sink_mt > (file_name, max_file_size, max_files); + sink_map[appender_name] = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(file_name, max_file_size, max_files); } else if ("stdout" == appender_type) { sink_map[appender_name] = spdlog::sinks::stdout_sink_mt::instance(); } else { @@ -227,7 +227,7 @@ std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(std::shared_ptr< if (logger != nullptr) { logger->log_debug("%s logger got sinks from namespace %s and level %s from namespace %s", name, sink_namespace_str, spdlog::level::level_names[level], level_namespace_str); } - spdlogger = std::make_shared < spdlog::logger > (name, begin(sinks), end(sinks)); + spdlogger = std::make_shared<spdlog::logger>(name, begin(sinks), end(sinks)); spdlogger->set_level(level); spdlogger->set_formatter(formatter); spdlogger->flush_on(std::max(spdlog::level::info, current_namespace->level)); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp index d4059d6..c556701 100644 --- a/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp +++ b/libminifi/src/core/reporting/SiteToSiteProvenanceReportingTask.cpp @@ -57,7 +57,7 @@ void SiteToSiteProvenanceReportingTask::getJsonReport(core::ProcessContext *cont std::string &report) { Json::Value array; for (auto sercomp : records) { - std::shared_ptr<provenance::ProvenanceEventRecord> record = std::dynamic_pointer_cast < provenance::ProvenanceEventRecord > (sercomp); + std::shared_ptr<provenance::ProvenanceEventRecord> record = std::dynamic_pointer_cast<provenance::ProvenanceEventRecord>(sercomp); if (nullptr == record) { break; } @@ -119,7 +119,7 @@ void SiteToSiteProvenanceReportingTask::onTrigger(core::ProcessContext *context, if (!protocol_->bootstrap()) { // bootstrap the client protocol if needeed context->yield(); - std::shared_ptr<Processor> processor = std::static_pointer_cast < Processor > (context->getProcessorNode().getProcessor()); + std::shared_ptr<Processor> processor = std::static_pointer_cast<Processor>(context->getProcessorNode()->getProcessor()); logger_->log_error("Site2Site bootstrap failed yield period %d peer ", processor->getYieldPeriodMsec()); returnProtocol(std::move(protocol_)); return; @@ -130,7 +130,7 @@ void SiteToSiteProvenanceReportingTask::onTrigger(core::ProcessContext *context, logger_->log_debug("batch size %d records", batch_size_); size_t deserialized = batch_size_; std::shared_ptr<core::Repository> repo = context->getProvenanceRepository(); - std::function < std::shared_ptr<core::SerializableComponent>() > constructor = []() {return std::make_shared<provenance::ProvenanceEventRecord>();}; + std::function<std::shared_ptr<core::SerializableComponent>()> constructor = []() {return std::make_shared<provenance::ProvenanceEventRecord>();}; if (!repo->DeSerialize(records, deserialized, constructor) && deserialized == 0) { logger_->log_debug("Not sending because deserialized is %d", deserialized); returnProtocol(std::move(protocol_)); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/repository/FileSystemRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/repository/FileSystemRepository.cpp b/libminifi/src/core/repository/FileSystemRepository.cpp index fba1fe3..4945b31 100644 --- a/libminifi/src/core/repository/FileSystemRepository.cpp +++ b/libminifi/src/core/repository/FileSystemRepository.cpp @@ -37,6 +37,11 @@ std::shared_ptr<io::BaseStream> FileSystemRepository::write(const std::shared_pt return std::make_shared<io::FileStream>(claim->getContentFullPath()); } +bool FileSystemRepository::exists(const std::shared_ptr<minifi::ResourceClaim> &streamId) { + std::ifstream file(streamId->getContentFullPath()); + return file.good(); +} + std::shared_ptr<io::BaseStream> FileSystemRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) { return std::make_shared<io::FileStream>(claim->getContentFullPath(), 0, false); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/repository/FlowFileRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/repository/FlowFileRepository.cpp b/libminifi/src/core/repository/FlowFileRepository.cpp index ac092ea..3ed7fbf 100644 --- a/libminifi/src/core/repository/FlowFileRepository.cpp +++ b/libminifi/src/core/repository/FlowFileRepository.cpp @@ -16,6 +16,7 @@ * limitations under the License. */ #include "core/repository/FlowFileRepository.h" +#include "leveldb/write_batch.h" #include <memory> #include <string> #include <utility> @@ -29,46 +30,58 @@ namespace minifi { namespace core { namespace repository { +void FlowFileRepository::flush() { + leveldb::WriteBatch batch; + std::string key; + std::string value; + leveldb::ReadOptions options; + + std::vector<std::shared_ptr<FlowFileRecord>> purgeList; + + uint64_t decrement_total = 0; + while (keys_to_delete.size_approx() > 0) { + if (keys_to_delete.try_dequeue(key)) { + db_->Get(options, key, &value); + decrement_total += value.size(); + std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_); + if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(value.data()), value.size())) { + purgeList.push_back(eventRead); + } + logger_->log_info("Issuing batch delete, including %s, Content path %s", eventRead->getUUIDStr(), eventRead->getContentFullPath()); + batch.Delete(key); + } + } + if (db_->Write(leveldb::WriteOptions(), &batch).ok()) { + logger_->log_info("Decrementing %u from a repo size of %u", decrement_total, repo_size_.load()); + if (decrement_total > repo_size_.load()) { + repo_size_ = 0; + } else { + repo_size_ -= decrement_total; + } + } + + if (nullptr != content_repo_) { + for (const auto &ffr : purgeList) { + auto claim = ffr->getResourceClaim(); + if (claim != nullptr) { + content_repo_->removeIfOrphaned(claim); + } + } + } +} + void FlowFileRepository::run() { // threshold for purge uint64_t purgeThreshold = max_partition_bytes_ * 3 / 4; + while (running_) { std::this_thread::sleep_for(std::chrono::milliseconds(purge_period_)); uint64_t curTime = getTimeMillis(); - uint64_t size = repoSize(); - if (size >= purgeThreshold) { - std::vector<std::shared_ptr<FlowFileRecord>> purgeList; - std::vector<std::pair<std::string, uint64_t>> keyRemovalList; - leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); - for (it->SeekToFirst(); it->Valid(); it->Next()) { - std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_); - std::string key = it->key().ToString(); - if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) { - if ((curTime - eventRead->getEventTime()) > max_partition_millis_) { - purgeList.push_back(eventRead); - keyRemovalList.push_back(std::make_pair(key, it->value().size())); - } - } else { - logger_->log_debug("NiFi %s retrieve event %s fail", name_.c_str(), key.c_str()); - keyRemovalList.push_back(std::make_pair(key, it->value().size())); - } - } - delete it; - for (auto eventId : keyRemovalList) { - logger_->log_info("Repository Repo %s Purge %s", name_.c_str(), eventId.first.c_str()); - if (Delete(eventId.first)) { - repo_size_ -= eventId.second; - } - } + flush(); + + uint64_t size = getRepoSize(); - for (const auto &ffr : purgeList) { - auto claim = ffr->getResourceClaim(); - if (claim != nullptr) { - content_repo_->remove(claim); - } - } - } if (size > max_partition_bytes_) repo_full_ = true; else @@ -81,22 +94,25 @@ void FlowFileRepository::loadComponent(const std::shared_ptr<core::ContentReposi std::vector<std::pair<std::string, uint64_t>> purgeList; leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions()); + repo_size_ = 0; for (it->SeekToFirst(); it->Valid(); it->Next()) { std::shared_ptr<FlowFileRecord> eventRead = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_); std::string key = it->key().ToString(); repo_size_ += it->value().size(); if (eventRead->DeSerialize(reinterpret_cast<const uint8_t *>(it->value().data()), it->value().size())) { + logger_->log_info("Found connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath()); auto search = connectionMap.find(eventRead->getConnectionUuid()); if (search != connectionMap.end()) { // we find the connection for the persistent flowfile, create the flowfile and enqueue that std::shared_ptr<core::FlowFile> flow_file_ref = std::static_pointer_cast<core::FlowFile>(eventRead); - std::shared_ptr<FlowFileRecord> record = std::make_shared<FlowFileRecord>(shared_from_this(), content_repo_); - // set store to repo to true so that we do need to persistent again in enqueue - record->setStoredToRepository(true); - search->second->put(record); + eventRead->setStoredToRepository(true); + search->second->put(eventRead); } else { + logger_->log_info("Could not find connectinon for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath()); if (eventRead->getContentFullPath().length() > 0) { - std::remove(eventRead->getContentFullPath().c_str()); + if (nullptr != eventRead->getResourceClaim()) { + content_repo_->remove(eventRead->getResourceClaim()); + } } purgeList.push_back(std::make_pair(key, it->value().size())); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/9f161a27/libminifi/src/core/repository/VolatileContentRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp index ac575c5..65f1cf9 100644 --- a/libminifi/src/core/repository/VolatileContentRepository.cpp +++ b/libminifi/src/core/repository/VolatileContentRepository.cpp @@ -132,6 +132,23 @@ std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shar return nullptr; } +bool VolatileContentRepository::exists(const std::shared_ptr<minifi::ResourceClaim> &claim) { + logger_->log_debug("enter exists"); + int size = 0; + { + std::lock_guard<std::mutex> lock(map_mutex_); + auto claim_check = master_list_.find(claim->getContentFullPath()); + if (claim_check != master_list_.end()) { + auto ent = claim_check->second->takeOwnership(); + if (ent == nullptr) { + return false; + } + return true; + } + } + return false; +} + std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const std::shared_ptr<minifi::ResourceClaim> &claim) { logger_->log_debug("enter read"); int size = 0;