fgerlits commented on a change in pull request #1170:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1170#discussion_r706076725



##########
File path: extensions/standard-processors/processors/ReplaceText.cpp
##########
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ReplaceText.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "core/TypedValues.h"
+#include "core/logging/LoggerConfiguration.h"
+#include "utils/LineByLineInputOutputStreamCallback.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property ReplaceText::EvaluationMode = 
core::PropertyBuilder::createProperty("Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line 
separately (Line-by-Line) or "
+                      "buffer the entire file into memory (Entire Text) and 
run against that.")
+    ->isRequired(true)
+    ->withDefaultValue<std::string>(toString(EvaluationModeType::LINE_BY_LINE))
+    ->withAllowableValues(EvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::LineByLineEvaluationMode = 
core::PropertyBuilder::createProperty("Line-by-Line Evaluation Mode")
+    ->withDescription("Run the 'Replacement Strategy' against each line 
separately (Line-by-Line) for All lines in the FlowFile, "
+                      "First Line (Header) only, Last Line (Footer) only, all 
Except the First Line (Header) or all Except the Last Line (Footer).")
+    ->isRequired(false)
+    
->withDefaultValue<std::string>(toString(LineByLineEvaluationModeType::ALL))
+    ->withAllowableValues(LineByLineEvaluationModeType::values())
+    ->build();
+
+const core::Property ReplaceText::ReplacementStrategy = 
core::PropertyBuilder::createProperty("Replacement Strategy")
+    ->withDescription("The strategy for how and what to replace within the 
FlowFile's text content. "
+                      "Substitute Variables replaces ${attribute_name} 
placeholders with the corresponding attribute's value "
+                      "(if an attribute is not found, the placeholder is kept 
as it was).")
+    ->isRequired(true)
+    ->withDefaultValue(toString(ReplacementStrategyType::REGEX_REPLACE))
+    ->withAllowableValues(ReplacementStrategyType::values())
+    ->build();
+
+const core::Property ReplaceText::MaximumBufferSize = 
core::PropertyBuilder::createProperty("Maximum Buffer Size")
+    ->withDescription("Specifies the maximum amount of data to buffer (per 
file or per line, depending on the Evaluation Mode) "
+                      "in order to apply the replacement. "
+                      "In 'Entire Text' evaluation mode, if the FlowFile is 
larger than this value, the FlowFile will be routed to 'failure'. "
+                      "In 'Line-by-Line' evaluation mode, if a single line is 
larger than this value, the FlowFile will be routed to 'failure'. "
+                      "A default value of 1 MB is provided, primarily for 
'Entire Text' mode. In 'Line-by-Line' mode, a value such as 8 KB or 16 KB is 
suggested. ")
+    ->isRequired(false)
+    ->withDefaultValue<core::DataSizeValue>("1 MB")
+    ->build();
+
+const core::Property ReplaceText::SearchValue = 
core::PropertyBuilder::createProperty("Search Value")
+    ->withDescription("The Search Value to search for in the FlowFile content. 
"
+                      "Only used for 'Literal Replace' and 'Regex Replace' 
matching strategies. "
+                      "Supports expression language except in Regex Replace 
mode.")
+    ->isRequired(false)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Property ReplaceText::ReplacementValue = 
core::PropertyBuilder::createProperty("Replacement Value")
+    ->withDescription("The value to insert using the 'Replacement Strategy'. "
+                      "Using 'Regex Replace' back-references to Regular 
Expression capturing groups are supported: "
+                      "$& is the entire matched substring, $1, $2, ... are the 
matched capturing groups. Use $$1 for a literal $1. "
+                      "Back-references to non-existent capturing groups will 
be replaced by empty strings. "
+                      "Supports expression language except in Regex Replace 
mode.")
+    ->isRequired(true)
+    ->supportsExpressionLanguage(true)
+    ->build();
+
+const core::Relationship ReplaceText::Success("success", "FlowFiles that have 
been successfully processed are routed to this relationship. "
+                                                         "This includes both 
FlowFiles that had text replaced and those that did not.");
+const core::Relationship ReplaceText::Failure("failure", "FlowFiles that could 
not be updated are routed to this relationship.");
+
+ReplaceText::ReplaceText(const std::string& name, const utils::Identifier& 
uuid)
+  : core::Processor(name, uuid),
+    logger_(logging::LoggerFactory<ReplaceText>::getLogger()) {
+}
+
+core::annotation::Input ReplaceText::getInputRequirement() const {
+  return core::annotation::Input::INPUT_REQUIRED;
+}
+
+void ReplaceText::initialize() {
+  setSupportedProperties({
+      SearchValue,
+      ReplacementValue,
+      MaximumBufferSize,
+      ReplacementStrategy,
+      EvaluationMode,
+      LineByLineEvaluationMode
+  });
+  setSupportedRelationships({
+      Success,
+      Failure
+  });
+}
+
+void ReplaceText::onSchedule(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSessionFactory>&) {
+  gsl_Expects(context);
+
+  const std::optional<std::string> evaluation_mode = 
context->getProperty(EvaluationMode);
+  evaluation_mode_ = 
EvaluationModeType::parse(evaluation_mode.value().c_str());
+  logger_->log_debug("the %s property is set to %s", EvaluationMode.getName(), 
evaluation_mode_.toString());
+
+  const std::optional<std::string> line_by_line_evaluation_mode = 
context->getProperty(LineByLineEvaluationMode);
+  if (line_by_line_evaluation_mode) {
+    line_by_line_evaluation_mode_ = 
LineByLineEvaluationModeType::parse(line_by_line_evaluation_mode->c_str());
+    logger_->log_debug("the %s property is set to %s", 
LineByLineEvaluationMode.getName(), line_by_line_evaluation_mode_.toString());
+  }
+
+  const std::optional<std::string> replacement_strategy = 
context->getProperty(ReplacementStrategy);
+  replacement_strategy_ = 
ReplacementStrategyType::parse(replacement_strategy.value().c_str());
+  logger_->log_debug("the %s property is set to %s", 
ReplacementStrategy.getName(), replacement_strategy_.toString());
+
+  context->getProperty(MaximumBufferSize.getName(), maximum_buffer_size_);
+  logger_->log_debug("the %s property is set to %" PRIu64 " bytes", 
MaximumBufferSize.getName(), maximum_buffer_size_);
+}
+
+void ReplaceText::onTrigger(const std::shared_ptr<core::ProcessContext>& 
context, const std::shared_ptr<core::ProcessSession>& session) {
+  gsl_Expects(context);
+  gsl_Expects(session);
+
+  std::shared_ptr<core::FlowFile> flow_file = session->get();
+  if (!flow_file) {
+    logger_->log_trace("No flow file");
+    return;
+  }
+
+  readSearchValueProperty(context, flow_file);
+  readReplacementValueProperty(context, flow_file);
+
+  switch (evaluation_mode_.value()) {
+    case EvaluationModeType::ENTIRE_TEXT:
+      replaceTextInEntireFile(flow_file, session);
+      return;
+    case EvaluationModeType::LINE_BY_LINE:
+      replaceTextLineByLine(flow_file, session);
+      return;
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, 
utils::StringUtils::join_pack("Unsupported ", EvaluationMode.getName(), ": ", 
evaluation_mode_.toString())};
+}
+
+void ReplaceText::readSearchValueProperty(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::FlowFile>& flow_file) {
+  bool found_search_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_search_value = context->getProperty(SearchValue.getName(), 
search_value_);
+  } else {
+    found_search_value = context->getProperty(SearchValue, search_value_, 
flow_file);
+  }
+  if (found_search_value) {
+    logger_->log_debug("the %s property is set to %s", SearchValue.getName(), 
search_value_);
+    if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+      search_regex_ = std::regex{search_value_};
+    }
+  }
+  if ((replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE || 
replacement_strategy_ == ReplacementStrategyType::LITERAL_REPLACE) && 
search_value_.empty()) {
+    throw Exception{PROCESSOR_EXCEPTION, utils::StringUtils::join_pack("Error: 
missing or empty ", SearchValue.getName(), " property")};
+  }
+}
+
+void ReplaceText::readReplacementValueProperty(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::FlowFile>& flow_file) {
+  bool found_replacement_value;
+  if (replacement_strategy_ == ReplacementStrategyType::REGEX_REPLACE) {
+    found_replacement_value = context->getProperty(ReplacementValue.getName(), 
replacement_value_);
+  } else {
+    found_replacement_value = context->getProperty(ReplacementValue, 
replacement_value_, flow_file);
+  }
+  if (found_replacement_value) {
+    logger_->log_debug("the %s property is set to %s", 
ReplacementValue.getName(), replacement_value_);
+  } else {
+    throw Exception{PROCESSOR_EXCEPTION, 
utils::StringUtils::join_pack("Missing required property: ", 
ReplacementValue.getName())};
+  }
+}
+
+namespace {
+
+struct ReadFlowFileIntoBuffer : public InputStreamCallback {
+  std::vector<uint8_t> buffer_;
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_read = stream->read(buffer_, stream->size());
+    return io::isError(bytes_read) ? -1 : gsl::narrow<int64_t>(bytes_read);
+  }
+};
+
+struct WriteBufferToFlowFile : public OutputStreamCallback {
+  const std::vector<uint8_t>& buffer_;
+
+  explicit WriteBufferToFlowFile(const std::vector<uint8_t>& buffer) : 
buffer_(buffer) {}
+
+  int64_t process(const std::shared_ptr<io::BaseStream>& stream) override {
+    size_t bytes_written = stream->write(buffer_, buffer_.size());
+    return io::isError(bytes_written) ? -1 : 
gsl::narrow<int64_t>(bytes_written);
+  }
+};
+
+}  // namespace
+
+void ReplaceText::replaceTextInEntireFile(const 
std::shared_ptr<core::FlowFile>& flow_file, const 
std::shared_ptr<core::ProcessSession>& session) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  if (flow_file->getSize() > maximum_buffer_size_) {
+    logger_->log_error("Flow file size %" PRIu64 " is larger than %s = %" 
PRIu64 " so transferring it to Failure",
+                       flow_file->getSize(), MaximumBufferSize.getName(), 
maximum_buffer_size_);
+    session->transfer(flow_file, Failure);
+    return;
+  }
+
+  try {
+    ReadFlowFileIntoBuffer read_callback;
+    session->read(flow_file, &read_callback);
+
+    std::string input{read_callback.buffer_.begin(), 
read_callback.buffer_.end()};
+    std::string output = applyReplacements(input, flow_file);
+    std::vector<uint8_t> modified_text{output.begin(), output.end()};
+
+    WriteBufferToFlowFile write_callback{modified_text};
+    session->write(flow_file, &write_callback);
+
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Entire text mode): %s", 
exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+void ReplaceText::replaceTextLineByLine(const std::shared_ptr<core::FlowFile>& 
flow_file, const std::shared_ptr<core::ProcessSession>& session) const {
+  gsl_Expects(flow_file);
+  gsl_Expects(session);
+
+  try {
+    utils::LineByLineInputOutputStreamCallback 
read_write_callback{maximum_buffer_size_, logger_,
+        [this, &flow_file](const std::string& input_line, bool is_first_line, 
bool is_last_line) {
+      switch (line_by_line_evaluation_mode_.value()) {
+        case LineByLineEvaluationModeType::ALL:
+          return applyReplacements(input_line, flow_file);
+        case LineByLineEvaluationModeType::FIRST_LINE:
+          return is_first_line ? applyReplacements(input_line, flow_file) : 
input_line;
+        case LineByLineEvaluationModeType::LAST_LINE:
+          return is_last_line ? applyReplacements(input_line, flow_file) : 
input_line;
+        case LineByLineEvaluationModeType::EXCEPT_FIRST_LINE:
+          return is_first_line ? input_line : applyReplacements(input_line, 
flow_file);
+        case LineByLineEvaluationModeType::EXCEPT_LAST_LINE:
+          return is_last_line ? input_line: applyReplacements(input_line, 
flow_file);
+      }
+      throw Exception{PROCESSOR_EXCEPTION, 
utils::StringUtils::join_pack("Unsupported ", 
LineByLineEvaluationMode.getName(), ": ", 
line_by_line_evaluation_mode_.toString())};
+    }};
+    session->readWrite(flow_file, &read_write_callback);
+    session->transfer(flow_file, Success);
+  } catch (const Exception& exception) {
+    logger_->log_error("Error in ReplaceText (Line-by-Line mode): %s", 
exception.what());
+    session->transfer(flow_file, Failure);
+  }
+}
+
+std::string ReplaceText::applyReplacements(const std::string& input, const 
std::shared_ptr<core::FlowFile>& flow_file) const {
+  switch (replacement_strategy_.value()) {
+    case ReplacementStrategyType::PREPEND:
+      return replacement_value_ + input;
+
+    case ReplacementStrategyType::APPEND:
+      return input + replacement_value_;
+
+    case ReplacementStrategyType::REGEX_REPLACE:
+      return applyRegexReplace(input);
+
+    case ReplacementStrategyType::LITERAL_REPLACE:
+      return applyLiteralReplace(input);
+
+    case ReplacementStrategyType::ALWAYS_REPLACE:
+      return replacement_value_;
+
+    case ReplacementStrategyType::SUBSTITUTE_VARIABLES:
+      return applySubstituteVariables(input, flow_file);
+  }
+
+  throw Exception{PROCESSOR_EXCEPTION, 
utils::StringUtils::join_pack("Unsupported ", ReplacementStrategy.getName(), ": 
", replacement_strategy_.toString())};
+}
+
+std::string ReplaceText::applyRegexReplace(const std::string& input) const {
+  const auto [chomped_line, line_ending] = utils::StringUtils::chomp(input);
+  std::string output = std::regex_replace(chomped_line, search_regex_, 
replacement_value_);
+  return output + line_ending;
+}
+
+std::string ReplaceText::applyLiteralReplace(const std::string& input) const {
+  std::vector<char> output;
+  output.reserve(input.size());
+
+  auto it = input.begin();
+  do {
+    auto found = std::search(it, input.end(), search_value_.begin(), 
search_value_.end());
+    if (found != input.end()) {
+      std::copy(it, found, std::back_inserter(output));
+      std::copy(replacement_value_.begin(), replacement_value_.end(), 
std::back_inserter(output));
+      it = found;
+      std::advance(it, search_value_.size());
+    } else {
+      std::copy(it, input.end(), std::back_inserter(output));
+      it = input.end();
+    }
+  } while (it != input.end());
+
+  return std::string{output.begin(), output.end()};
+}
+
+std::string ReplaceText::applySubstituteVariables(const std::string& input, 
const std::shared_ptr<core::FlowFile>& flow_file) const {
+  static const std::regex PLACEHOLDER{R"(\$\{([^}]+)\})"};

Review comment:
       That was my original plan, too, but the expression language substitution 
logic is hidden inside the expression-language extension, and I didn't think 
exposing it should be part of this PR.  Also, full expression language support 
was not a requirement.  We could do this later in a separate PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to