Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/148#discussion_r145691328 --- Diff: libminifi/src/core/ProcessSession.cpp --- @@ -799,6 +799,152 @@ void ProcessSession::import(std::string source, std::shared_ptr<core::FlowFile> } } +bool ProcessSession::exportContent( + const std::string &destination, + const std::string &tmpFile, + std::shared_ptr<core::FlowFile> &flow, + bool keepContent) { + logger_->log_info( + "Exporting content of %s to %s", + flow->getUUIDStr().c_str(), + destination.c_str()); + + ReadCallback cb(tmpFile, destination, logger_); + read(flow, &cb); + + logger_->log_info("Committing %s", destination.c_str()); + bool commit_ok = cb.commit(); + + if (commit_ok) { + logger_->log_info("Commit OK."); + } else { + logger_->log_error( + "Commit of %s to %s failed!", + flow->getUUIDStr().c_str(), + destination.c_str()); + } + return commit_ok; +} + +bool ProcessSession::exportContent( + const std::string &destination, + std::shared_ptr<core::FlowFile> &flow, + bool keepContent) { + std::string tmpFileName = boost::filesystem::unique_path().native(); + return exportContent(destination, tmpFileName, flow, keepContent); +} + +ProcessSession::ReadCallback::ReadCallback(const std::string &tmpFile, + const std::string &destFile, + std::shared_ptr<logging::Logger> logger) + : _tmpFile(tmpFile), + _tmpFileOs(tmpFile, std::ios::binary), + _destFile(destFile), + logger_(logger) { +} + +// Copy the entire file contents to the temporary file +int64_t ProcessSession::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) { + // Copy file contents into tmp file + _writeSucceeded = false; + size_t size = 0; + uint8_t buffer[8192]; + do { + int read = stream->read(buffer, 8192); + if (read < 0) { + return -1; + } + if (read == 0) { + break; + } + _tmpFileOs.write(reinterpret_cast<char*>(buffer), read); + size += read; + } while (size < stream->getSize()); + _writeSucceeded = true; + return size; +} + +// Renames tmp file to final destination +// Returns true if commit succeeded +bool ProcessSession::ReadCallback::commit() { + bool success = false; + + logger_->log_info("committing export operation to %s", _destFile.c_str()); + + if (_writeSucceeded) { + _tmpFileOs.close(); + + if (rename(_tmpFile.c_str(), _destFile.c_str())) { + logger_->log_info("commit export operation to %s failed because rename() call failed", _destFile.c_str()); + } else { + success = true; + logger_->log_info("commit export operation to %s succeeded", _destFile.c_str()); + } + } else { + logger_->log_error("commit export operation to %s failed because write failed", _destFile.c_str()); + } + return success; +} + +// Clean up resources +ProcessSession::ReadCallback::~ReadCallback() { + // Close tmp file + _tmpFileOs.close(); + + // Clean up tmp file, if necessary + unlink(_tmpFile.c_str()); +} + + +void ProcessSession::stash(const std::string &key, std::shared_ptr<core::FlowFile> flow) { --- End diff -- What are the ramifications if power is lost and the rocksdb repos WAL causes us to repeat this flow? Would that previous tmp file be left around?
---