Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/156#discussion_r147532806 --- Diff: extensions/libarchive/ManipulateArchive.cpp --- @@ -0,0 +1,309 @@ +/** + * @file ManipulateArchive.cpp + * ManipulateArchive class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <string.h> +#include <iostream> +#include <fstream> +#include <memory> +#include <string> +#include <set> + +#include <boost/filesystem.hpp> + +#include <archive.h> +#include <archive_entry.h> + +#include "ManipulateArchive.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/FlowFile.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property ManipulateArchive::Operation("Operation", "Operation to perform on the archive", ""); +core::Property ManipulateArchive::Target("Target", "The path within the archive to perform the operation on", ""); +core::Property ManipulateArchive::Destination("Destination", "Destination for operations (move or copy) which result in new entries", ""); +core::Property ManipulateArchive::Before("Before", "For operations which result in new entries, places the new entry before the entry specified by this property", ""); +core::Property ManipulateArchive::After("After", "For operations which result in new entries, places the new entry after the entry specified by this property", ""); +core::Relationship ManipulateArchive::Success("success", "success operational on the flow record"); + +char const* ManipulateArchive::OPERATION_REMOVE = "remove"; +char const* ManipulateArchive::OPERATION_COPY = "copy"; +char const* ManipulateArchive::OPERATION_MOVE = "move"; +char const* ManipulateArchive::OPERATION_TOUCH = "touch"; + +void ManipulateArchive::initialize() { + //! Set the supported properties + std::set<core::Property> properties; + properties.insert(Operation); + properties.insert(Target); + properties.insert(Destination); + properties.insert(Before); + properties.insert(After); + setSupportedProperties(properties); + + //! Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); +} + +void ManipulateArchive::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { + std::shared_ptr<core::FlowFile> flowFile = session->get(); + + if (!flowFile) { + return; + } + + std::string operation; + context->getProperty(Operation.getName(), operation); + + std::string targetEntry; + context->getProperty(Target.getName(), targetEntry); + + std::string destination; + context->getProperty(Destination.getName(), destination); + + std::string before; + context->getProperty(Before.getName(), before); + + std::string after; + context->getProperty(After.getName(), after); + + // TODO(calebj) Validate properties + + FocusArchiveEntry::ArchiveMetadata archiveMetadata; + FocusArchiveEntry::ReadCallback readCallback(this, &archiveMetadata); + session->read(flowFile, &readCallback); + + logger_->log_info("ManipulateArchive performing operation %s on %s", operation.c_str(), targetEntry.c_str()); + + // Perform operation: REMOVE + if (operation == OPERATION_REMOVE) { + for (auto it = archiveMetadata.entryMetadata.begin(); it != archiveMetadata.entryMetadata.end();) { + if ((*it).entryName == targetEntry) { + logger_->log_info("ManipulateArchive found entry %s for removal", targetEntry.c_str()); + std::remove((*it).tmpFileName.c_str()); + it = archiveMetadata.entryMetadata.erase(it); + break; + } else { + it++; + } + } + } + + // Perform operation: COPY + if (operation == OPERATION_COPY) { + bool found = false; + FocusArchiveEntry::ArchiveEntryMetadata copy; + + // Find item to copy + for (auto it = archiveMetadata.entryMetadata.begin(); it != archiveMetadata.entryMetadata.end();) { + if ((*it).entryName == targetEntry) { + logger_->log_info("ManipulateArchive found entry %s to copy", targetEntry.c_str()); + copy = *it; + found = true; + break; + } else { + it++; + } + } + + if (found) { + // Copy tmp file + const auto origTmpFileName = copy.tmpFileName; + const auto newTmpFileName = boost::filesystem::unique_path().native(); + copy.tmpFileName = newTmpFileName; + + { + std::ifstream src(origTmpFileName, std::ios::binary); + std::ofstream dst(newTmpFileName, std::ios::binary); + dst << src.rdbuf(); + } + + copy.entryName = destination; + + // Update metadata + if (after != "") { + for (auto it = archiveMetadata.entryMetadata.begin();;) { --- End diff -- minor nit ;;
---