adamdebreceni commented on code in PR #2088:
URL: https://github.com/apache/nifi-minifi-cpp/pull/2088#discussion_r2732357178
##########
libminifi/src/utils/file/AssetManager.cpp:
##########
@@ -121,51 +121,74 @@ std::string AssetManager::hash() const {
nonstd::expected<void, std::string> AssetManager::sync(
const AssetLayout& layout,
- const std::function<nonstd::expected<std::vector<std::byte>,
std::string>(std::string_view /*url*/)>& fetch) {
+ const std::function<nonstd::expected<void, std::string>(std::string_view
/*url*/, const std::filesystem::path& /*tmp_path*/)>& fetch) {
logger_->log_info("Synchronizing assets");
std::lock_guard lock(mtx_);
AssetLayout new_state{
.digest = state_.digest,
.assets = {}
};
- std::string fetch_errors;
- std::vector<std::pair<std::filesystem::path, std::vector<std::byte>>>
new_file_contents;
+ std::string new_asset_errors;
+ std::vector<AssetDescription> new_assets;
for (auto& new_entry : layout.assets) {
if (std::find_if(state_.assets.begin(), state_.assets.end(), [&] (auto&
entry) {return entry.id == new_entry.id;}) == state_.assets.end()) {
logger_->log_info("Fetching asset (id = '{}', path = '{}') from {}",
new_entry.id, new_entry.path.string(), new_entry.url);
- if (auto data = fetch(new_entry.url)) {
- new_file_contents.emplace_back(new_entry.path, data.value());
+ if (auto status = fetch(new_entry.url, (root_ / new_entry.path).string()
+ ".part")) {
+ new_assets.emplace_back(new_entry);
new_state.assets.insert(new_entry);
} else {
- logger_->log_error("Failed to fetch asset (id = '{}', path = '{}')
from {}: {}", new_entry.id, new_entry.path.string(), new_entry.url,
data.error());
- fetch_errors += "Failed to fetch '" + new_entry.id + "' from '" +
new_entry.url + "': " + data.error() + "\n";
+ logger_->log_error("Failed to fetch asset (id = '{}', path = '{}')
from {}: {}", new_entry.id, new_entry.path.string(), new_entry.url,
status.error());
+ new_asset_errors += "Failed to fetch '" + new_entry.id + "' from '" +
new_entry.url + "': " + status.error() + "\n";
}
} else {
logger_->log_info("Asset (id = '{}', path = '{}') already exists",
new_entry.id, new_entry.path.string());
new_state.assets.insert(new_entry);
}
}
- if (fetch_errors.empty()) {
- new_state.digest = layout.digest;
- }
for (auto& old_entry : state_.assets) {
if (std::find_if(layout.assets.begin(), layout.assets.end(), [&] (auto&
entry) {return entry.id == old_entry.id;}) == layout.assets.end()) {
logger_->log_info("We no longer need asset (id = '{}', path = '{}')",
old_entry.id, old_entry.path.string());
- std::filesystem::remove(root_ / old_entry.path);
+ std::error_code ec;
+ std::filesystem::remove(root_ / old_entry.path, ec);
+ if (ec) {
+ logger_->log_error("Failed to delete obsolete asset (id = '{}', path =
'{}')", old_entry.id, old_entry.path.string());
+ } else {
+ logger_->log_info("Successfully deleted obsolete asset (id = '{}',
path = '{}')", old_entry.id, old_entry.path.string());
+ }
}
}
- for (auto& [path, content] : new_file_contents) {
- create_dir((root_ / path).parent_path());
- std::ofstream{root_ / path, std::ios::binary}.write(reinterpret_cast<const
char*>(content.data()), gsl::narrow<std::streamsize>(content.size()));
+ for (auto& asset : new_assets) {
+ auto full_path = root_ / asset.path;
+ if (utils::file::create_dir(full_path.parent_path()) != 0) {
+ logger_->log_error("Failed to create asset directory '{}'",
full_path.parent_path());
+ new_asset_errors += fmt::format("Failed to create asset directory '{}'",
full_path.parent_path()) + "\n";
+ new_state.assets.erase(asset);
+ } else {
+ std::error_code ec;
+ std::filesystem::rename(full_path.string() + ".part", full_path, ec);
+ if (ec) {
+ logger_->log_error("Failed to move temporary asset file '{}' to '{}'",
full_path.string() + ".part", full_path.string());
+ new_asset_errors += fmt::format("Failed to move temporary asset file
'{}' to '{}'", full_path.string() + ".part", full_path.string()) + "\n";
+ new_state.assets.erase(asset);
+ } else {
+ logger_->log_info("Successfully moved temporary asset file '{}' to
'{}'", full_path.string() + ".part", full_path.string());
+ }
+ }
+ }
+
+ if (new_asset_errors.empty()) {
+ new_state.digest = layout.digest;
+ } else {
+ new_state.digest.clear();
Review Comment:
we are not atomic in terms of asset layout, we report an empty digest but
store the downloaded assets' information, meaning when the c2-protocol triggers
a new asset sync, only the missing assets will be downloaded
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]