Github user phrocker commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/148#discussion_r145692128
  
    --- Diff: libminifi/src/processors/FocusArchiveEntry.cpp ---
    @@ -0,0 +1,340 @@
    +/**
    + * @file FocusArchiveEntry.cpp
    + * FocusArchiveEntry class implementation
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#include "processors/FocusArchiveEntry.h"
    +
    +#include <archive.h>
    +#include <archive_entry.h>
    +
    +#include <string.h>
    +
    +#include <boost/filesystem.hpp>
    +
    +#include <string>
    +#include <set>
    +
    +#include <iostream>
    +#include <fstream>
    +#include <memory>
    +
    +#include "core/ProcessContext.h"
    +#include "core/ProcessSession.h"
    +
    +#include "json/json.h"
    +#include "json/writer.h"
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace processors {
    +
    +core::Property FocusArchiveEntry::Path(
    +    "Path",
    +    "The path within the archive to focus (\"/\" to focus the total 
archive)",
    +    "");
    +core::Relationship FocusArchiveEntry::Success(
    +    "success",
    +    "success operational on the flow record");
    +
    +bool 
FocusArchiveEntry::set_del_or_update_attr(std::shared_ptr<core::FlowFile> 
flowFile, const std::string key, std::string* value) const {
    +  if (value == nullptr)
    +    return flowFile->removeAttribute(key);
    +  else if (flowFile->updateAttribute(key, *value))
    +    return true;
    +  else
    +    return flowFile->addAttribute(key, *value);
    +}
    +
    +void FocusArchiveEntry::initialize() {
    +  //! Set the supported properties
    +  std::set<core::Property> properties;
    +  properties.insert(Path);
    +  setSupportedProperties(properties);
    +  //! Set the supported relationships
    +  std::set<core::Relationship> relationships;
    +  relationships.insert(Success);
    +  setSupportedRelationships(relationships);
    +}
    +
    +void FocusArchiveEntry::onTrigger(core::ProcessContext *context,
    +                                  core::ProcessSession *session) {
    +  auto flowFile = session->get();
    +  std::shared_ptr<FlowFileRecord> flowFileRecord = 
std::static_pointer_cast<FlowFileRecord>(flowFile);
    +
    +  if (!flowFile) {
    +    return;
    +  }
    +
    +  std::string targetEntry;
    +  context->getProperty(Path.getName(), targetEntry);
    +
    +  // Extract archive contents
    +  ArchiveMetadata archiveMetadata;
    +  archiveMetadata.focusedEntry = targetEntry;
    +  ReadCallback cb(&archiveMetadata);
    +  session->read(flowFile, &cb);
    +
    +  // For each extracted entry, import & stash to key
    +  std::string targetEntryStashKey;
    +
    +  for (auto &entryMetadata : archiveMetadata.entryMetadata) {
    +    if (entryMetadata.entryType == AE_IFREG) {
    +      logger_->log_info("FocusArchiveEntry importing %s from %s",
    +          entryMetadata.entryName.c_str(),
    +          entryMetadata.tmpFileName.c_str());
    +      session->import(entryMetadata.tmpFileName, flowFile, false, 0);
    +      char stashKey[37];
    +      uuid_t stashKeyUuid;
    +      uuid_generate(stashKeyUuid);
    +      uuid_unparse_lower(stashKeyUuid, stashKey);
    +      logger_->log_debug(
    +          "FocusArchiveEntry generated stash key %s for entry %s",
    +          stashKey,
    +          entryMetadata.entryName.c_str());
    +      entryMetadata.stashKey.assign(stashKey);
    +
    +      if (entryMetadata.entryName == targetEntry) {
    +        targetEntryStashKey = entryMetadata.stashKey;
    +      }
    +
    +      // Stash the content
    +      session->stash(entryMetadata.stashKey, flowFile);
    +    }
    +  }
    +
    +  // Restore target archive entry
    +  if (targetEntryStashKey != "") {
    +    session->restore(targetEntryStashKey, flowFile);
    +  } else {
    +    logger_->log_warn(
    +          "FocusArchiveEntry failed to locate target entry: %s",
    +          targetEntry.c_str());
    +  }
    +
    +  // Set new/updated lens stack to attribute
    +  {
    +    Json::Value lensStack;
    +    Json::Reader reader;
    +
    +    std::string existingLensStack;
    +
    +    if (flowFile->getAttribute("lens.archive.stack", existingLensStack)) {
    +      logger_->log_info("FocusArchiveEntry loading existing lens context");
    +      if (!reader.parse(existingLensStack, lensStack)) {
    +        logger_->log_error("FocusArchiveEntry JSON parse error: %s",
    +            reader.getFormattedErrorMessages());
    +        context->yield();
    +        return;
    +      }
    +    } else {
    +      lensStack = Json::Value(Json::arrayValue);
    +    }
    +
    +    Json::Value structVal(Json::arrayValue);
    +
    +    for (const auto &entryMetadata : archiveMetadata.entryMetadata) {
    +      Json::Value entryVal(Json::objectValue);
    +      entryVal["entry_name"] = Json::Value(entryMetadata.entryName);
    +      entryVal["entry_type"] = Json::Value(entryMetadata.entryType);
    +      entryVal["entry_perm"] = Json::Value(entryMetadata.entryPerm);
    +      entryVal["entry_size"] = Json::Value(entryMetadata.entrySize);
    +      entryVal["entry_uid"] = Json::Value(entryMetadata.entryUID);
    +      entryVal["entry_gid"] = Json::Value(entryMetadata.entryGID);
    +      entryVal["entry_mtime"] = Json::Value(entryMetadata.entryMTime);
    +      entryVal["entry_mtime_nsec"] = 
Json::Value(entryMetadata.entryMTimeNsec);
    +
    +      if (entryMetadata.entryType == AE_IFREG) {
    +          entryVal["stash_key"] = Json::Value(entryMetadata.stashKey);
    +      }
    +
    +      structVal.append(entryVal);
    +    }
    +
    +    std::string archivenameStr;
    +    Json::Value archiveName {Json::nullValue};
    +
    +    if (flowFile->getAttribute("filename", archivenameStr)) {
    +       archiveName = Json::Value(archivenameStr);
    +    }
    +
    +    Json::Value lensVal(Json::objectValue);
    +    lensVal["archive_format_name"] = 
Json::Value(archiveMetadata.archiveFormatName);
    +    lensVal["archive_name"] = archiveName;
    +    lensVal["focused_entry"] = Json::Value(archiveMetadata.focusedEntry);
    +    lensVal["archive_format"] = Json::Value(archiveMetadata.archiveFormat);
    +    lensVal["archive_structure"] = structVal;
    +    lensStack.append(lensVal);
    +
    +    Json::FastWriter writer;
    +    std::string stackStr = writer.write(lensStack);
    +
    +    if (!flowFile->updateAttribute("lens.archive.stack", stackStr)) {
    +      flowFile->addAttribute("lens.archive.stack", stackStr);
    +    }
    +  }
    +
    +  // Update filename attribute to that of focused entry
    +  std::size_t found = targetEntry.find_last_of("/\\");
    +  std::string path = targetEntry.substr(0, found);
    +  std::string name = targetEntry.substr(found + 1);
    +  set_del_or_update_attr(flowFile, "filename", &name);
    +  set_del_or_update_attr(flowFile, "path", &path);
    +  set_del_or_update_attr(flowFile, "absolute.path", &targetEntry);
    +
    +  // Transfer to the relationship
    +  session->transfer(flowFile, Success);
    +}
    +
    +typedef struct {
    +  std::shared_ptr<io::BaseStream> stream;
    +  char buf[8196];
    +} FocusArchiveEntryReadData;
    +
    +int64_t 
FocusArchiveEntry::ReadCallback::process(std::shared_ptr<io::BaseStream> 
stream) {
    +  auto inputArchive = archive_read_new();
    +  struct archive_entry *entry;
    +  int64_t nlen = 0;
    +
    +  FocusArchiveEntryReadData data;
    +  data.stream = stream;
    +
    +  archive_read_support_format_all(inputArchive);
    +  archive_read_support_filter_all(inputArchive);
    +
    +  // Read callback which reads from ifstream
    +  auto read = [] (archive *, void *d, const void **buf) -> int64_t {
    +    auto data = static_cast<FocusArchiveEntryReadData *>(d);
    +    *buf = data->buf;
    +    int64_t read = 0;
    +    int64_t last_read = 0;
    +
    +    do {
    +      last_read = data->stream->readData(reinterpret_cast<uint8_t 
*>(data->buf), 8196 - read);
    +      read += last_read;
    +    } while (last_read > 0 && read < 8196);
    +
    +    return read;
    +  };
    +
    +  // Close callback for libarchive
    +  auto close = [] (archive *, void *) -> int {
    +    // Because we do not need to close the stream, do nothing & return 
success
    +    return 0;
    +  };
    +
    +  // Read each item in the archive
    +  int res;
    +
    +  if ((res = archive_read_open(inputArchive, &data, NULL, read, close))) {
    +      logger_->log_error(
    +          "FocusArchiveEntry can't open due to archive error: %s",
    +          archive_error_string(inputArchive));
    +      return nlen;
    +  }
    +
    +  for (;;) {
    --- End diff --
    
    ^Same as above...isRunning() ?


---

Reply via email to