Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/148#discussion_r145692128 --- Diff: libminifi/src/processors/FocusArchiveEntry.cpp --- @@ -0,0 +1,340 @@ +/** + * @file FocusArchiveEntry.cpp + * FocusArchiveEntry class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "processors/FocusArchiveEntry.h" + +#include <archive.h> +#include <archive_entry.h> + +#include <string.h> + +#include <boost/filesystem.hpp> + +#include <string> +#include <set> + +#include <iostream> +#include <fstream> +#include <memory> + +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +#include "json/json.h" +#include "json/writer.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property FocusArchiveEntry::Path( + "Path", + "The path within the archive to focus (\"/\" to focus the total archive)", + ""); +core::Relationship FocusArchiveEntry::Success( + "success", + "success operational on the flow record"); + +bool FocusArchiveEntry::set_del_or_update_attr(std::shared_ptr<core::FlowFile> flowFile, const std::string key, std::string* value) const { + if (value == nullptr) + return flowFile->removeAttribute(key); + else if (flowFile->updateAttribute(key, *value)) + return true; + else + return flowFile->addAttribute(key, *value); +} + +void FocusArchiveEntry::initialize() { + //! Set the supported properties + std::set<core::Property> properties; + properties.insert(Path); + setSupportedProperties(properties); + //! Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); +} + +void FocusArchiveEntry::onTrigger(core::ProcessContext *context, + core::ProcessSession *session) { + auto flowFile = session->get(); + std::shared_ptr<FlowFileRecord> flowFileRecord = std::static_pointer_cast<FlowFileRecord>(flowFile); + + if (!flowFile) { + return; + } + + std::string targetEntry; + context->getProperty(Path.getName(), targetEntry); + + // Extract archive contents + ArchiveMetadata archiveMetadata; + archiveMetadata.focusedEntry = targetEntry; + ReadCallback cb(&archiveMetadata); + session->read(flowFile, &cb); + + // For each extracted entry, import & stash to key + std::string targetEntryStashKey; + + for (auto &entryMetadata : archiveMetadata.entryMetadata) { + if (entryMetadata.entryType == AE_IFREG) { + logger_->log_info("FocusArchiveEntry importing %s from %s", + entryMetadata.entryName.c_str(), + entryMetadata.tmpFileName.c_str()); + session->import(entryMetadata.tmpFileName, flowFile, false, 0); + char stashKey[37]; + uuid_t stashKeyUuid; + uuid_generate(stashKeyUuid); + uuid_unparse_lower(stashKeyUuid, stashKey); + logger_->log_debug( + "FocusArchiveEntry generated stash key %s for entry %s", + stashKey, + entryMetadata.entryName.c_str()); + entryMetadata.stashKey.assign(stashKey); + + if (entryMetadata.entryName == targetEntry) { + targetEntryStashKey = entryMetadata.stashKey; + } + + // Stash the content + session->stash(entryMetadata.stashKey, flowFile); + } + } + + // Restore target archive entry + if (targetEntryStashKey != "") { + session->restore(targetEntryStashKey, flowFile); + } else { + logger_->log_warn( + "FocusArchiveEntry failed to locate target entry: %s", + targetEntry.c_str()); + } + + // Set new/updated lens stack to attribute + { + Json::Value lensStack; + Json::Reader reader; + + std::string existingLensStack; + + if (flowFile->getAttribute("lens.archive.stack", existingLensStack)) { + logger_->log_info("FocusArchiveEntry loading existing lens context"); + if (!reader.parse(existingLensStack, lensStack)) { + logger_->log_error("FocusArchiveEntry JSON parse error: %s", + reader.getFormattedErrorMessages()); + context->yield(); + return; + } + } else { + lensStack = Json::Value(Json::arrayValue); + } + + Json::Value structVal(Json::arrayValue); + + for (const auto &entryMetadata : archiveMetadata.entryMetadata) { + Json::Value entryVal(Json::objectValue); + entryVal["entry_name"] = Json::Value(entryMetadata.entryName); + entryVal["entry_type"] = Json::Value(entryMetadata.entryType); + entryVal["entry_perm"] = Json::Value(entryMetadata.entryPerm); + entryVal["entry_size"] = Json::Value(entryMetadata.entrySize); + entryVal["entry_uid"] = Json::Value(entryMetadata.entryUID); + entryVal["entry_gid"] = Json::Value(entryMetadata.entryGID); + entryVal["entry_mtime"] = Json::Value(entryMetadata.entryMTime); + entryVal["entry_mtime_nsec"] = Json::Value(entryMetadata.entryMTimeNsec); + + if (entryMetadata.entryType == AE_IFREG) { + entryVal["stash_key"] = Json::Value(entryMetadata.stashKey); + } + + structVal.append(entryVal); + } + + std::string archivenameStr; + Json::Value archiveName {Json::nullValue}; + + if (flowFile->getAttribute("filename", archivenameStr)) { + archiveName = Json::Value(archivenameStr); + } + + Json::Value lensVal(Json::objectValue); + lensVal["archive_format_name"] = Json::Value(archiveMetadata.archiveFormatName); + lensVal["archive_name"] = archiveName; + lensVal["focused_entry"] = Json::Value(archiveMetadata.focusedEntry); + lensVal["archive_format"] = Json::Value(archiveMetadata.archiveFormat); + lensVal["archive_structure"] = structVal; + lensStack.append(lensVal); + + Json::FastWriter writer; + std::string stackStr = writer.write(lensStack); + + if (!flowFile->updateAttribute("lens.archive.stack", stackStr)) { + flowFile->addAttribute("lens.archive.stack", stackStr); + } + } + + // Update filename attribute to that of focused entry + std::size_t found = targetEntry.find_last_of("/\\"); + std::string path = targetEntry.substr(0, found); + std::string name = targetEntry.substr(found + 1); + set_del_or_update_attr(flowFile, "filename", &name); + set_del_or_update_attr(flowFile, "path", &path); + set_del_or_update_attr(flowFile, "absolute.path", &targetEntry); + + // Transfer to the relationship + session->transfer(flowFile, Success); +} + +typedef struct { + std::shared_ptr<io::BaseStream> stream; + char buf[8196]; +} FocusArchiveEntryReadData; + +int64_t FocusArchiveEntry::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) { + auto inputArchive = archive_read_new(); + struct archive_entry *entry; + int64_t nlen = 0; + + FocusArchiveEntryReadData data; + data.stream = stream; + + archive_read_support_format_all(inputArchive); + archive_read_support_filter_all(inputArchive); + + // Read callback which reads from ifstream + auto read = [] (archive *, void *d, const void **buf) -> int64_t { + auto data = static_cast<FocusArchiveEntryReadData *>(d); + *buf = data->buf; + int64_t read = 0; + int64_t last_read = 0; + + do { + last_read = data->stream->readData(reinterpret_cast<uint8_t *>(data->buf), 8196 - read); + read += last_read; + } while (last_read > 0 && read < 8196); + + return read; + }; + + // Close callback for libarchive + auto close = [] (archive *, void *) -> int { + // Because we do not need to close the stream, do nothing & return success + return 0; + }; + + // Read each item in the archive + int res; + + if ((res = archive_read_open(inputArchive, &data, NULL, read, close))) { + logger_->log_error( + "FocusArchiveEntry can't open due to archive error: %s", + archive_error_string(inputArchive)); + return nlen; + } + + for (;;) { --- End diff -- ^Same as above...isRunning() ?
---