szaszm commented on code in PR #2088:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2088#discussion_r2694964837
##########
C2.md:
##########
@@ -115,6 +115,10 @@ be requested via C2 DESCRIBE manifest command.
# specify the maximum number of bulletins to send in a heartbeat
# nifi.c2.flow.info.processor.bulletin.limit=1000
+ # specify timeout for asset download operations
Review Comment:
```suggestion
# Specify timeout for asset download operations. The entire download must
# finish in the specified amount of time. There is a separate fixed 30
second
# timeout from the last received data packet.
```
##########
libminifi/src/utils/file/AssetManager.cpp:
##########
@@ -121,51 +121,74 @@ std::string AssetManager::hash() const {
nonstd::expected<void, std::string> AssetManager::sync(
const AssetLayout& layout,
- const std::function<nonstd::expected<std::vector<std::byte>,
std::string>(std::string_view /*url*/)>& fetch) {
+ const std::function<nonstd::expected<void, std::string>(std::string_view
/*url*/, const std::filesystem::path& /*tmp_path*/)>& fetch) {
logger_->log_info("Synchronizing assets");
std::lock_guard lock(mtx_);
AssetLayout new_state{
.digest = state_.digest,
.assets = {}
};
- std::string fetch_errors;
- std::vector<std::pair<std::filesystem::path, std::vector<std::byte>>>
new_file_contents;
+ std::string new_asset_errors;
+ std::vector<AssetDescription> new_assets;
for (auto& new_entry : layout.assets) {
if (std::find_if(state_.assets.begin(), state_.assets.end(), [&] (auto&
entry) {return entry.id == new_entry.id;}) == state_.assets.end()) {
logger_->log_info("Fetching asset (id = '{}', path = '{}') from {}",
new_entry.id, new_entry.path.string(), new_entry.url);
- if (auto data = fetch(new_entry.url)) {
- new_file_contents.emplace_back(new_entry.path, data.value());
+ if (auto status = fetch(new_entry.url, (root_ / new_entry.path).string()
+ ".part")) {
+ new_assets.emplace_back(new_entry);
new_state.assets.insert(new_entry);
} else {
- logger_->log_error("Failed to fetch asset (id = '{}', path = '{}')
from {}: {}", new_entry.id, new_entry.path.string(), new_entry.url,
data.error());
- fetch_errors += "Failed to fetch '" + new_entry.id + "' from '" +
new_entry.url + "': " + data.error() + "\n";
+ logger_->log_error("Failed to fetch asset (id = '{}', path = '{}')
from {}: {}", new_entry.id, new_entry.path.string(), new_entry.url,
status.error());
+ new_asset_errors += "Failed to fetch '" + new_entry.id + "' from '" +
new_entry.url + "': " + status.error() + "\n";
}
} else {
logger_->log_info("Asset (id = '{}', path = '{}') already exists",
new_entry.id, new_entry.path.string());
new_state.assets.insert(new_entry);
}
}
- if (fetch_errors.empty()) {
- new_state.digest = layout.digest;
- }
for (auto& old_entry : state_.assets) {
if (std::find_if(layout.assets.begin(), layout.assets.end(), [&] (auto&
entry) {return entry.id == old_entry.id;}) == layout.assets.end()) {
logger_->log_info("We no longer need asset (id = '{}', path = '{}')",
old_entry.id, old_entry.path.string());
- std::filesystem::remove(root_ / old_entry.path);
+ std::error_code ec;
+ std::filesystem::remove(root_ / old_entry.path, ec);
+ if (ec) {
+ logger_->log_error("Failed to delete obsolete asset (id = '{}', path =
'{}')", old_entry.id, old_entry.path.string());
+ } else {
+ logger_->log_info("Successfully deleted obsolete asset (id = '{}',
path = '{}')", old_entry.id, old_entry.path.string());
+ }
}
}
- for (auto& [path, content] : new_file_contents) {
- create_dir((root_ / path).parent_path());
- std::ofstream{root_ / path, std::ios::binary}.write(reinterpret_cast<const
char*>(content.data()), gsl::narrow<std::streamsize>(content.size()));
+ for (auto& asset : new_assets) {
+ auto full_path = root_ / asset.path;
+ if (utils::file::create_dir(full_path.parent_path()) != 0) {
+ logger_->log_error("Failed to create asset directory '{}'",
full_path.parent_path());
+ new_asset_errors += fmt::format("Failed to create asset directory '{}'",
full_path.parent_path()) + "\n";
+ new_state.assets.erase(asset);
+ } else {
+ std::error_code ec;
+ std::filesystem::rename(full_path.string() + ".part", full_path, ec);
+ if (ec) {
+ logger_->log_error("Failed to move temporary asset file '{}' to '{}'",
full_path.string() + ".part", full_path.string());
+ new_asset_errors += fmt::format("Failed to move temporary asset file
'{}' to '{}'", full_path.string() + ".part", full_path.string()) + "\n";
+ new_state.assets.erase(asset);
+ } else {
+ logger_->log_info("Successfully moved temporary asset file '{}' to
'{}'", full_path.string() + ".part", full_path.string());
+ }
+ }
+ }
+
+ if (new_asset_errors.empty()) {
+ new_state.digest = layout.digest;
+ } else {
+ new_state.digest.clear();
Review Comment:
What if some assets were successfully downloaded, but not all?
##########
core-framework/src/http/HTTPClient.cpp:
##########
@@ -398,8 +398,8 @@ const char *HTTPClient::getContentType() {
const std::vector<char> &HTTPClient::getResponseBody() {
if (response_data_.response_body.empty()) {
- if (read_callback_) {
- response_data_.response_body = read_callback_->to_string();
+ if (auto byte_output =
dynamic_cast<utils::ByteOutputCallback*>(read_callback_.get())) {
+ response_data_.response_body = byte_output->to_string();
Review Comment:
```suggestion
if (auto byte_output_callback =
dynamic_cast<utils::ByteOutputCallback*>(read_callback_.get())) {
response_data_.response_body = byte_output_callback->to_string();
```
##########
libminifi/src/c2/C2Agent.cpp:
##########
@@ -1140,25 +1164,32 @@ void C2Agent::handleAssetUpdate(const
C2ContentResponse& resp) {
return;
}
- C2Payload file_response = protocol_->fetch(url);
-
- if (file_response.getStatus().getState() !=
state::UpdateState::READ_COMPLETE) {
- send_error("Failed to fetch asset from '" + url + "'");
- return;
- }
-
- auto raw_data = std::move(file_response).moveRawData();
// ensure directory exists for file
if (utils::file::create_dir(file_path.parent_path()) != 0) {
send_error("Failed to create directory '" +
file_path.parent_path().string() + "'");
return;
}
- {
- std::ofstream file{file_path, std::ofstream::binary};
- file.write(reinterpret_cast<const char*>(raw_data.data()),
gsl::narrow<std::streamsize>(raw_data.size()));
+ std::filesystem::path tmp_file{file_path.string() + ".part"};
+
+ std::ofstream file{tmp_file, std::ofstream::binary};
+ if (!file) {
+ send_error("Failed to open asset file to write '" + tmp_file.string() +
"'");
+ return;
+ }
+ bool success = protocol_->fetch(url, [&] (std::span<const char> chunk) {
+ file.write(chunk.data(), gsl::narrow<std::streamsize>(chunk.size()));
+ return file.good();
+ });
+ file.close();
+ if (!file || !success) {
+ std::filesystem::remove(tmp_file);
+ send_error("Failed to fetch asset from '" + url + "'");
+ return;
}
+ std::filesystem::rename(tmp_file, file_path);
Review Comment:
this snippet is largely duplicated in the previous change in the same file,
I'd try extracting the common parts
##########
core-framework/include/http/HTTPClient.h:
##########
@@ -226,16 +230,17 @@ class HTTPClient : public BaseHTTPClient, public
core::ConnectableImpl {
void configure_secure_connection();
- std::chrono::milliseconds getAbsoluteTimeout() const { return
3*read_timeout_; }
+ std::chrono::milliseconds getAbsoluteTimeout() const { return
absolute_timeout_.value_or(3*read_timeout_); }
Review Comment:
I'd move the logic for the default outside of these getters and setters:
initialize the timeout to 3*read_timeout_ from the usage site, and if it
remains an empty optional, the empty state should mean no timeout.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]