This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push: new e31a9958a MINIFICPP-1831 Asset download through c2 e31a9958a is described below commit e31a9958a64fdbc016f059793095bed9fc3f7cda Author: Adam Debreceni <adebrec...@apache.org> AuthorDate: Fri May 20 17:46:11 2022 +0200 MINIFICPP-1831 Asset download through c2 Closes #1332 Signed-off-by: Marton Szasz <sza...@apache.org> --- extensions/http-curl/protocols/RESTSender.cpp | 2 +- extensions/http-curl/tests/C2UpdateAssetTest.cpp | 260 +++++++++++++++++++++++ extensions/http-curl/tests/CMakeLists.txt | 1 + extensions/http-curl/tests/HTTPHandlers.h | 39 +++- libminifi/include/c2/C2Agent.h | 1 + libminifi/include/c2/C2Payload.h | 11 +- libminifi/include/properties/Configuration.h | 2 + libminifi/src/Configuration.cpp | 3 +- libminifi/src/c2/C2Agent.cpp | 111 ++++++++++ 9 files changed, 418 insertions(+), 12 deletions(-) diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp index c94e1c0a4..7f593684d 100644 --- a/extensions/http-curl/protocols/RESTSender.cpp +++ b/extensions/http-curl/protocols/RESTSender.cpp @@ -160,7 +160,7 @@ C2Payload RESTSender::sendPayload(const std::string url, const Direction directi } const auto response_body_bytes = gsl::make_span(client.getResponseBody()).as_span<const std::byte>(); logger_->log_trace("Received response: \"%s\"", [&] {return utils::StringUtils::escapeUnprintableBytes(response_body_bytes);}); - if (isOkay && respCode) { + if (isOkay && !clientError && !serverError) { if (payload.isRaw()) { C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true); response_payload.setRawData(response_body_bytes); diff --git a/extensions/http-curl/tests/C2UpdateAssetTest.cpp b/extensions/http-curl/tests/C2UpdateAssetTest.cpp new file mode 100644 index 000000000..f1e0136b6 --- /dev/null +++ b/extensions/http-curl/tests/C2UpdateAssetTest.cpp @@ -0,0 +1,260 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#undef NDEBUG +#include <vector> +#include <string> +#include <fstream> +#include <iterator> + +#include "HTTPIntegrationBase.h" +#include "HTTPHandlers.h" +#include "utils/IntegrationTestUtils.h" +#include "utils/Environment.h" + +class FileProvider : public ServerAwareHandler { + public: + explicit FileProvider(std::string file_content): file_content_(std::move(file_content)) {} + + bool handleGet(CivetServer* /*server*/, struct mg_connection* conn) override { + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + file_content_.length()); + mg_printf(conn, "%s", file_content_.c_str()); + return true; + } + + private: + std::string file_content_; +}; + +class C2HeartbeatHandler : public HeartbeatHandler { + public: + using HeartbeatHandler::HeartbeatHandler; + + bool handlePost(CivetServer* /*server*/, struct mg_connection* conn) override { + std::lock_guard<std::mutex> guard(op_mtx_); + sendHeartbeatResponse(operations_, conn); + operations_.clear(); + return true; + } + + void addOperation(std::string id, std::unordered_map<std::string, std::string> args) { + std::lock_guard<std::mutex> guard(op_mtx_); + operations_.push_back(C2Operation{ + .operation = "update", + .operand = "asset", + .operation_id = std::move(id), + .args = std::move(args) + }); + } + + private: + std::mutex op_mtx_; + std::vector<C2Operation> operations_; +}; + +class VerifyC2AssetUpdate : public VerifyC2Base { + public: + void configureC2() override { + configuration->set("nifi.c2.agent.protocol.class", "RESTSender"); + configuration->set("nifi.c2.enable", "true"); + configuration->set("nifi.c2.agent.heartbeat.period", "100"); + } + + void runAssertions() override { + assert(utils::verifyEventHappenedInPollTime(std::chrono::seconds(10), verify_)); + } + + void setVerifier(std::function<bool()> verify) { + verify_ = std::move(verify); + } + + private: + std::function<bool()> verify_; +}; + +struct AssetUpdateOperation { + std::string id; + std::unordered_map<std::string, std::string> args; + std::string state; + std::optional<std::string> details; +}; + +int main() { + TestController controller; + + // setup minifi home + const std::filesystem::path home_dir = controller.createTempDirectory(); + const auto asset_dir = home_dir / "asset"; + utils::Environment::setCurrentWorkingDirectory(home_dir.string().c_str()); + + C2AcknowledgeHandler ack_handler; + std::string file_A = "hello from file A"; + FileProvider file_A_provider{file_A}; + std::string file_B = "hello from file B"; + FileProvider file_B_provider{file_B}; + C2HeartbeatHandler hb_handler{std::make_shared<minifi::Configure>()}; + + VerifyC2AssetUpdate harness; + harness.setUrl("http://localhost:0/api/file/A.txt", &file_A_provider); + harness.setUrl("http://localhost:0/api/file/B.txt", &file_B_provider); + + std::string absolute_file_A_url = "http://localhost:" + harness.getWebPort() + "/api/file/A.txt"; + + std::vector<AssetUpdateOperation> operations; + + operations.push_back({ + .id = "1", + .args = {}, + .state = "NOT_APPLIED", + .details = "Couldn't find 'file' argument" + }); + + operations.push_back({ + .id = "2", + .args = { + {"file", "my_file.txt"} + }, + .state = "NOT_APPLIED", + .details = "Couldn't find 'url' argument" + }); + + operations.push_back({ + .id = "3", + .args = { + {"file", "my_file.txt"}, + {"url", "/api/file/A.txt"} + }, + .state = "FULLY_APPLIED", + .details = std::nullopt + }); + + operations.push_back({ + .id = "4", + .args = { + {"file", "my_file.txt"}, + {"url", "/api/file/A.txt"} + }, + .state = "NO_OPERATION", + .details = std::nullopt + }); + + operations.push_back({ + .id = "5", + .args = { + {"file", "my_file.txt"}, + {"url", "/api/file/B.txt"}, + {"forceDownload", "true"} + }, + .state = "FULLY_APPLIED", + .details = std::nullopt + }); + + operations.push_back({ + .id = "6", + .args = { + {"file", "new_dir/inner/my_file.txt"}, + {"url", "/api/file/A.txt"} + }, + .state = "FULLY_APPLIED", + .details = std::nullopt + }); + + operations.push_back({ + .id = "7", + .args = { + {"file", "dummy.txt"}, + {"url", "/not_existing_api/file.txt"} + }, + .state = "NOT_APPLIED", + .details = "Failed to fetch asset" + }); + + operations.push_back({ + .id = "8", + .args = { + {"file", "../../system_lib.dll"}, + {"url", "/not_existing_api/file.txt"} + }, + .state = "NOT_APPLIED", + .details = "Accessing parent directory is forbidden in file path" + }); + + operations.push_back({ + .id = "9", + .args = { + {"file", "other_dir/A.txt"}, + {"url", absolute_file_A_url} + }, + .state = "FULLY_APPLIED", + .details = std::nullopt + }); + + for (auto& op : operations) { + hb_handler.addOperation(op.id, op.args); + } + + harness.setVerifier([&] () -> bool { + for (auto& op : operations) { + if (auto res = ack_handler.getState(op.id)) { + if (res->state != op.state) { + controller.getLogger()->log_error("Operation '%s' in expected to return '%s', got '%s'", op.id, op.state, res->state); + assert(false); + } + if (op.details) { + if (res->details.find(op.details.value()) == std::string::npos) { + controller.getLogger()->log_error("In operation '%s' failed to find '%s' in ack details '%s'", op.id, op.details.value(), res->details); + assert(false); + } + } + } else { + return false; + } + } + return true; + }); + + harness.setUrl("http://localhost:0/api/heartbeat", &hb_handler); + harness.setUrl("http://localhost:0/api/acknowledge", &ack_handler); + harness.setC2Url("/api/heartbeat", "/api/acknowledge"); + + harness.run(); + + std::unordered_map<std::string, std::string> expected_files; + // verify directory structure + for (auto& op : operations) { + if (op.state != "FULLY_APPLIED") { + // this op failed no file made on the disk + continue; + } + expected_files[(asset_dir / op.args["file"]).string()] = utils::StringUtils::endsWith(op.args["url"], "A.txt") ? file_A : file_B; + } + + size_t file_count = utils::file::list_dir_all(asset_dir.string(), controller.getLogger()).size(); + if (file_count != expected_files.size()) { + controller.getLogger()->log_error("Expected %zu files, got %zu", expected_files.size(), file_count); + assert(false); + } + for (auto& [path, content] : expected_files) { + if (utils::file::get_content(path) != content) { + controller.getLogger()->log_error("File content mismatch at '%s'", path); + assert(false); + } + } +} diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt index 975b6e907..d8849c8db 100644 --- a/extensions/http-curl/tests/CMakeLists.txt +++ b/extensions/http-curl/tests/CMakeLists.txt @@ -102,3 +102,4 @@ add_test(NAME C2DebugBundleTest COMMAND C2DebugBundleTest) add_test(NAME C2PropertiesUpdateTests COMMAND C2PropertiesUpdateTests) add_test(NAME C2ClearCoreComponentStateTest COMMAND C2ClearCoreComponentStateTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml" "${TEST_RESOURCES}/") add_test(NAME C2MultipleCommandsTest COMMAND C2MultipleCommandsTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml" "${TEST_RESOURCES}/") +add_test(NAME C2UpdateAssetTest COMMAND C2UpdateAssetTest) diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h index 0174eb197..eeeea2dde 100644 --- a/extensions/http-curl/tests/HTTPHandlers.h +++ b/extensions/http-curl/tests/HTTPHandlers.h @@ -833,23 +833,36 @@ class RetryHttpGetResponder : public ServerAwareHandler { }; class C2AcknowledgeHandler : public ServerAwareHandler { + struct OpResult { + std::string state; + std::string details; + }; + public: bool handlePost(CivetServer* /*server*/, struct mg_connection* conn) override { std::string req = readPayload(conn); rapidjson::Document root; root.Parse(req.data(), req.size()); - if (root.IsObject() && root.HasMember("operationId")) { - std::lock_guard<std::mutex> guard(ack_operations_mtx_); - acknowledged_operations_.insert(root["operationId"].GetString()); - } + + std::string result_state; + std::string details; if (root.IsObject() && root.HasMember("operationState")) { - if (root["operationState"].IsObject() && root["operationState"].HasMember("state")) { - std::lock_guard<std::mutex> guard(apply_count_mtx_); - auto result_state = root["operationState"]["state"].GetString(); - ++apply_count_[result_state]; + if (root["operationState"].IsObject()) { + if (root["operationState"].HasMember("state")) { + result_state = root["operationState"]["state"].GetString(); + std::lock_guard<std::mutex> guard(apply_count_mtx_); + ++apply_count_[result_state]; + } + if (root["operationState"].HasMember("details")) { + details = root["operationState"]["details"].GetString(); + } } } + if (root.IsObject() && root.HasMember("operationId")) { + std::lock_guard<std::mutex> guard(ack_operations_mtx_); + acknowledged_operations_.insert({root["operationId"].GetString(), OpResult{result_state, details}}); + } mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " "text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"); @@ -861,6 +874,14 @@ class C2AcknowledgeHandler : public ServerAwareHandler { return acknowledged_operations_.count(operation_id) > 0; } + std::optional<OpResult> getState(const std::string& operation_id) const { + std::lock_guard<std::mutex> guard(ack_operations_mtx_); + if (auto it = acknowledged_operations_.find(operation_id); it != acknowledged_operations_.end()) { + return it->second; + } + return std::nullopt; + } + uint32_t getApplyCount(const std::string& result_state) const { std::lock_guard<std::mutex> guard(apply_count_mtx_); return apply_count_.find(result_state) != apply_count_.end() ? apply_count_.at(result_state) : 0; @@ -869,6 +890,6 @@ class C2AcknowledgeHandler : public ServerAwareHandler { private: mutable std::mutex ack_operations_mtx_; mutable std::mutex apply_count_mtx_; - std::set<std::string> acknowledged_operations_; + std::unordered_map<std::string, OpResult> acknowledged_operations_; std::unordered_map<std::string, uint32_t> apply_count_; }; diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h index b32b029f6..5fb564786 100644 --- a/libminifi/include/c2/C2Agent.h +++ b/libminifi/include/c2/C2Agent.h @@ -173,6 +173,7 @@ class C2Agent : public state::UpdateController { bool handleConfigurationUpdate(const C2ContentResponse &resp); void handlePropertyUpdate(const C2ContentResponse &resp); + void handleAssetUpdate(const C2ContentResponse &resp); std::optional<std::string> resolveFlowUrl(const std::string& url) const; diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h index badc5e0eb..de8790a3a 100644 --- a/libminifi/include/c2/C2Payload.h +++ b/libminifi/include/c2/C2Payload.h @@ -60,7 +60,8 @@ SMART_ENUM(DescribeOperand, SMART_ENUM(UpdateOperand, (CONFIGURATION, "configuration"), - (PROPERTIES, "properties") + (PROPERTIES, "properties"), + (ASSET, "asset") ) SMART_ENUM(TransferOperand, @@ -118,6 +119,13 @@ struct C2ContentResponse { friend std::ostream& operator<<(std::ostream& out, const C2ContentResponse& response); + std::optional<std::string> getArgument(const std::string& arg_name) const { + if (auto it = operation_arguments.find(arg_name); it != operation_arguments.end()) { + return it->second.to_string(); + } + return std::nullopt; + } + Operation op; // determines if the operation is required bool required{ false }; @@ -199,6 +207,7 @@ class C2Payload : public state::Update { */ [[nodiscard]] std::vector<std::byte> getRawData() const noexcept { return raw_data_; } [[nodiscard]] std::string getRawDataAsString() const { return utils::span_to<std::string>(gsl::make_span(getRawData()).as_span<const char>()); } + [[nodiscard]] std::vector<std::byte> moveRawData() && {return std::move(raw_data_);} /** * Add a nested payload. diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h index 301bdafd2..aa0c05f3f 100644 --- a/libminifi/include/properties/Configuration.h +++ b/libminifi/include/properties/Configuration.h @@ -153,6 +153,8 @@ class Configuration : public Properties { static constexpr const char *nifi_log_compression_cached_log_max_size = "nifi.log.compression.cached.log.max.size"; static constexpr const char *nifi_log_compression_compressed_log_max_size = "nifi.log.compression.compressed.log.max.size"; + static constexpr const char *nifi_asset_directory = "nifi.asset.directory"; + MINIFIAPI static const std::vector<core::ConfigurationProperty> CONFIGURATION_PROPERTIES; MINIFIAPI static const std::array<const char*, 2> DEFAULT_SENSITIVE_PROPERTIES; diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp index c8bc7e445..07970d51f 100644 --- a/libminifi/src/Configuration.cpp +++ b/libminifi/src/Configuration.cpp @@ -122,7 +122,8 @@ const std::vector<core::ConfigurationProperty> Configuration::CONFIGURATION_PROP core::ConfigurationProperty{Configuration::nifi_log_appender_syslog}, core::ConfigurationProperty{Configuration::nifi_log_logger_root}, core::ConfigurationProperty{Configuration::nifi_log_compression_cached_log_max_size, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_log_compression_compressed_log_max_size, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())} + core::ConfigurationProperty{Configuration::nifi_log_compression_compressed_log_max_size, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())}, + core::ConfigurationProperty{Configuration::nifi_asset_directory} }; const std::array<const char*, 2> Configuration::DEFAULT_SENSITIVE_PROPERTIES = {Configuration::nifi_security_client_pass_phrase, diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index 3e6fc92a3..82ceded8d 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -601,6 +601,10 @@ void C2Agent::handle_update(const C2ContentResponse &resp) { handlePropertyUpdate(resp); break; } + case UpdateOperand::ASSET: { + handleAssetUpdate(resp); + break; + } } } @@ -900,6 +904,113 @@ bool C2Agent::handleConfigurationUpdate(const C2ContentResponse &resp) { return true; } +static auto make_path(const std::string& str) { + return std::filesystem::path(str); +} + +static std::optional<std::string> validateFilePath(const std::filesystem::path& path) { + if (path.empty()) { + return "Empty file path"; + } + if (!path.is_relative()) { + return "File path must be a relative path '" + path.string() + "'"; + } + if (!path.has_filename()) { + return "Filename missing in output path '" + path.string() + "'"; + } + if (path.filename() == "." || path.filename() == "..") { + return "Invalid filename '" + path.filename().string() + "'"; + } + for (const auto& segment : path) { + if (segment == "..") { + return "Accessing parent directory is forbidden in file path '" + path.string() + "'"; + } + } + return std::nullopt; +} + +void C2Agent::handleAssetUpdate(const C2ContentResponse& resp) { + auto send_error = [&] (std::string_view error) { + logger_->log_error("%s", std::string(error)); + C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::SET_ERROR, resp.ident, true); + response.setRawData(gsl::span<const char>(error).as_span<const std::byte>()); + enqueue_c2_response(std::move(response)); + }; + std::filesystem::path asset_dir = std::filesystem::path(configuration_->getHome()) / "asset"; + if (auto asset_dir_str = configuration_->get(Configuration::nifi_asset_directory)) { + asset_dir = asset_dir_str.value(); + } + + // output file + std::filesystem::path file_path; + if (auto file_rel = resp.getArgument("file") | utils::map(make_path)) { + if (auto error = validateFilePath(file_rel.value())) { + send_error(error.value()); + return; + } + file_path = asset_dir / file_rel.value(); + } else { + send_error("Couldn't find 'file' argument"); + return; + } + + // source url + std::string url; + if (auto url_str = resp.getArgument("url")) { + if (auto resolved_url = resolveUrl(*url_str)) { + url = resolved_url.value(); + } else { + send_error("Couldn't resolve url"); + return; + } + } else { + send_error("Couldn't find 'url' argument"); + return; + } + + // forceDownload + bool force_download = false; + if (auto force_download_str = resp.getArgument("forceDownload")) { + if (utils::StringUtils::equalsIgnoreCase(force_download_str.value(), "true")) { + force_download = true; + } else if (utils::StringUtils::equalsIgnoreCase(force_download_str.value(), "false")) { + force_download = false; + } else { + send_error("Argument 'forceDownload' must be either 'true' or 'false'"); + return; + } + } + + if (!force_download && std::filesystem::exists(file_path)) { + logger_->log_info("File already exists"); + C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::NO_OPERATION, resp.ident, true); + enqueue_c2_response(std::move(response)); + return; + } + + C2Payload file_response = protocol_.load()->consumePayload(url, C2Payload(Operation::TRANSFER, true), RECEIVE, false); + + if (file_response.getStatus().getState() != state::UpdateState::READ_COMPLETE) { + send_error("Failed to fetch asset from '" + url + "'"); + return; + } + + auto raw_data = std::move(file_response).moveRawData(); + // ensure directory exists for file + if (utils::file::create_dir(file_path.parent_path().string()) != 0) { + send_error("Failed to create directory '" + file_path.parent_path().string() + "'"); + return; + } + + { + std::ofstream file{file_path, std::ofstream::binary}; + file.write(reinterpret_cast<const char*>(raw_data.data()), raw_data.size()); + } + + C2Payload response(Operation::ACKNOWLEDGE, state::UpdateState::FULLY_APPLIED, resp.ident, true); + enqueue_c2_response(std::move(response)); +} + void C2Agent::enqueue_c2_server_response(C2Payload &&resp) { logger_->log_trace("Server response: %s", [&] {return resp.str();});