[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1188: MINIFICPP-1651: Added DefragmentText processor

2021-10-29 Thread GitBox


fgerlits commented on a change in pull request #1188:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1188#discussion_r739234364



##
File path: extensions/standard-processors/tests/unit/DefragmentTextTests.cpp
##
@@ -0,0 +1,273 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestBase.h"
+#include "WriteToFlowFileTestProcessor.h"
+#include "ReadFromFlowFileTestProcessor.h"
+#include "UpdateAttribute.h"
+#include "DefragmentText.h"
+#include "TextFragmentUtils.h"
+#include "utils/TestUtils.h"
+#include "serialization/PayloadSerializer.h"
+#include "serialization/FlowFileSerializer.h"
+#include "unit/ContentRepositoryDependentTests.h"
+
+using WriteToFlowFileTestProcessor = 
org::apache::nifi::minifi::processors::WriteToFlowFileTestProcessor;
+using ReadFromFlowFileTestProcessor = 
org::apache::nifi::minifi::processors::ReadFromFlowFileTestProcessor;
+using UpdateAttribute = org::apache::nifi::minifi::processors::UpdateAttribute;
+using DefragmentText = org::apache::nifi::minifi::processors::DefragmentText;
+
+TEST_CASE("DefragTextFlowFilesNoMultilinePatternAtStartTest", 
"[defragmenttextnomultilinepatternatstarttest]") {
+  TestController testController;
+  std::shared_ptr plan = testController.createPlan();
+  std::shared_ptr write_to_flow_file = 
std::dynamic_pointer_cast(
+  plan->addProcessor("WriteToFlowFileTestProcessor", 
"write_to_flow_file"));
+  std::shared_ptr defrag_text_flow_files =  
std::dynamic_pointer_cast(
+  plan->addProcessor("DefragmentText", "defrag_text_flow_files", 
core::Relationship("success", "description"), true));
+  std::shared_ptr read_from_flow_file = 
std::dynamic_pointer_cast(
+  plan->addProcessor("ReadFromFlowFileTestProcessor", 
"read_from_flow_file", DefragmentText::Success, true));
+  plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), 
"<[0-9]+>");
+
+
+  write_to_flow_file->setContent("<1> Foo");
+  testController.runSession(plan);
+  CHECK(read_from_flow_file->numberOfFlowFilesRead() == 0);
+  write_to_flow_file->setContent("<2> Bar");
+  plan->reset();
+  testController.runSession(plan);
+  CHECK(read_from_flow_file->readFlowFileWithContent("<1> Foo"));
+  write_to_flow_file->setContent("<3> Baz");
+  plan->reset();
+  testController.runSession(plan);
+  CHECK(read_from_flow_file->readFlowFileWithContent("<2> Bar"));
+}
+
+TEST_CASE("DefragmentTextEmptyPattern", "[defragmenttextemptypattern]") {
+  TestController testController;
+  std::shared_ptr plan = testController.createPlan();
+  std::shared_ptr write_to_flow_file = 
std::dynamic_pointer_cast(
+  plan->addProcessor("WriteToFlowFileTestProcessor", 
"write_to_flow_file"));
+  std::shared_ptr defrag_text_flow_files =  
std::dynamic_pointer_cast(
+  plan->addProcessor("DefragmentText", "defrag_text_flow_files", 
core::Relationship("success", "description"), true));
+  std::shared_ptr read_from_flow_file = 
std::dynamic_pointer_cast(
+  plan->addProcessor("ReadFromFlowFileTestProcessor", 
"read_from_flow_file", DefragmentText::Success, true));
+  plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), 
"");
+  plan->setProperty(defrag_text_flow_files, 
DefragmentText::PatternLoc.getName(), 
toString(DefragmentText::PatternLocation::END_OF_MESSAGE));
+
+  REQUIRE_THROWS_WITH(testController.runSession(plan), "Process Schedule 
Operation: Pattern property missing or invalid");
+}
+
+TEST_CASE("DefragmentTextNoMultilinePatternAtEndTest", 
"[defragmenttextnomultilinepatternatendtest]") {
+  TestController testController;
+  std::shared_ptr plan = testController.createPlan();
+  std::shared_ptr write_to_flow_file = 
std::dynamic_pointer_cast(
+  plan->addProcessor("WriteToFlowFileTestProcessor", 
"write_to_flow_file"));
+  std::shared_ptr defrag_text_flow_files =  
std::dynamic_pointer_cast(
+  plan->addProcessor("DefragmentText", "defrag_text_flow_files", 
core::Relationship("success", "description"), true));
+  std::shared_ptr read_from_flow_file = 
std::dynamic_pointer_cast(
+  plan->addProcessor("ReadFromFlowFileTestProcessor", 
"read_from_flow_file", DefragmentText::Success, true));

Review comment:
   Using sections would require 

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1188: MINIFICPP-1651: Added DefragmentText processor

2021-10-29 Thread GitBox


fgerlits commented on a change in pull request #1188:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1188#discussion_r739169606



##
File path: extensions/standard-processors/processors/DefragmentText.cpp
##
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DefragmentText.h"
+
+#include 
+
+#include "core/Resource.h"
+#include "serialization/PayloadSerializer.h"
+#include "TextFragmentUtils.h"
+#include "utils/gsl.h"
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Relationship DefragmentText::Success("success", "Flowfiles that 
have been successfully defragmented");
+const core::Relationship DefragmentText::Failure("failure", "Flowfiles that 
failed the defragmentation process");
+const core::Relationship DefragmentText::Self("__self__", "Marks the FlowFile 
to be owned by this processor");
+
+const core::Property DefragmentText::Pattern(
+core::PropertyBuilder::createProperty("Pattern")
+->withDescription("A regular expression to match at the start or end 
of messages.")
+->isRequired(true)->build());
+
+const core::Property DefragmentText::PatternLoc(
+core::PropertyBuilder::createProperty("Pattern 
Location")->withDescription("Whether the pattern is located at the start or at 
the end of the messages.")
+->withAllowableValues(PatternLocation::values())
+
->withDefaultValue(toString(PatternLocation::START_OF_MESSAGE))->build());
+
+
+const core::Property DefragmentText::MaxBufferSize(
+core::PropertyBuilder::createProperty("Max Buffer Size")
+->withDescription("The maximum buffer size, if the buffer exceeds 
this, it will be transferred to failure. Expected format is  ")
+
->withType(core::StandardValidators::get().DATA_SIZE_VALIDATOR)->build());
+
+const core::Property DefragmentText::MaxBufferAge(
+core::PropertyBuilder::createProperty("Max Buffer Age")->
+withDescription("The maximum age of a buffer after which the buffer 
will be transferred to failure. Expected format is  ")->build());
+
+void DefragmentText::initialize() {
+  setSupportedRelationships({Success, Failure});
+  setSupportedProperties({Pattern, PatternLoc, MaxBufferAge, MaxBufferSize});
+}
+
+void DefragmentText::onSchedule(core::ProcessContext* context, 
core::ProcessSessionFactory*) {
+  gsl_Expects(context);
+
+  std::string max_buffer_age_str;
+  if (context->getProperty(MaxBufferAge.getName(), max_buffer_age_str)) {
+core::TimeUnit unit;
+uint64_t max_buffer_age;
+if (core::Property::StringToTime(max_buffer_age_str, max_buffer_age, unit) 
&& core::Property::ConvertTimeUnitToMS(max_buffer_age, unit, max_buffer_age)) {
+  buffer_.setMaxAge(std::chrono::milliseconds(max_buffer_age));
+  logger_->log_trace("The Buffer maximum age is configured to be %" PRIu64 
" ms", max_buffer_age);
+}
+  }
+
+  std::string max_buffer_size_str;
+  if (context->getProperty(MaxBufferSize.getName(), max_buffer_size_str)) {
+uint64_t max_buffer_size = 
core::DataSizeValue(max_buffer_size_str).getValue();
+if (max_buffer_size > 0) {
+  buffer_.setMaxSize(max_buffer_size);
+  logger_->log_trace("The Buffer maximum size is configured to be %" 
PRIu64 " B", max_buffer_size);
+}
+  }
+
+  context->getProperty(PatternLoc.getName(), pattern_location_);
+
+  std::string pattern_str;
+  if (context->getProperty(Pattern.getName(), pattern_str) && 
!pattern_str.empty()) {
+pattern_ = std::regex(pattern_str);
+logger_->log_trace("The Pattern is configured to be %s", pattern_str);
+  } else {
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Pattern property missing or 
invalid");
+  }
+}
+
+void DefragmentText::onTrigger(core::ProcessContext*, core::ProcessSession* 
session) {
+  gsl_Expects(session);
+  auto flowFiles = flow_file_store_.getNewFlowFiles();
+  for (auto& file : flowFiles) {
+processNextFragment(session, file);
+  }
+  std::shared_ptr original_flow_file = session->get();
+  processNextFragment(session, original_flow_file);
+  if (buffer_.maxAgeReached() || buffer_.maxSizeReached()) {
+buffer_.flushAndReplace(session, Failure, nullptr);
+  }

Review comment:
 

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1188: MINIFICPP-1651: Added DefragmentText processor

2021-10-27 Thread GitBox


fgerlits commented on a change in pull request #1188:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1188#discussion_r737495323



##
File path: extensions/standard-processors/processors/DefragmentText.cpp
##
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DefragmentText.h"
+
+#include 
+
+#include "core/Resource.h"
+#include "serialization/PayloadSerializer.h"
+#include "TextFragmentUtils.h"
+#include "utils/gsl.h"
+#include "utils/StringUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Relationship DefragmentText::Success("success", "Flowfiles that 
have been successfully defragmented");
+const core::Relationship DefragmentText::Failure("failure", "Flowfiles that 
failed the defragmentation process");
+const core::Relationship DefragmentText::Self("__self__", "Marks the FlowFile 
to be owned by this processor");
+
+const core::Property DefragmentText::Pattern(
+core::PropertyBuilder::createProperty("Pattern")
+->withDescription("A regular expression to match at the start or end 
of messages.")
+->isRequired(true)->build());
+
+const core::Property DefragmentText::PatternLoc(
+core::PropertyBuilder::createProperty("Pattern 
Location")->withDescription("Whether the pattern is located at the start or at 
the end of the messages.")
+->withAllowableValues(PatternLocation::values())
+
->withDefaultValue(toString(PatternLocation::START_OF_MESSAGE))->build());
+
+
+const core::Property DefragmentText::MaxBufferSize(
+core::PropertyBuilder::createProperty("Max Buffer Size")
+->withDescription("The maximum buffer size, if the buffer exceeds 
this, it will be transferred to failure. Expected format is  ")
+
->withType(core::StandardValidators::get().DATA_SIZE_VALIDATOR)->build());
+
+const core::Property DefragmentText::MaxBufferAge(
+core::PropertyBuilder::createProperty("Max Buffer Age")->
+withDescription("The maximum age of a buffer after which the buffer 
will be transferred to failure. Expected format is  ")->build());
+
+void DefragmentText::initialize() {
+  setSupportedRelationships({Success, Failure});
+  setSupportedProperties({Pattern, PatternLoc, MaxBufferAge, MaxBufferSize});
+}
+
+void DefragmentText::onSchedule(core::ProcessContext* context, 
core::ProcessSessionFactory*) {
+  gsl_Expects(context);
+
+  std::string max_buffer_age_str;
+  if (context->getProperty(MaxBufferAge.getName(), max_buffer_age_str)) {
+core::TimeUnit unit;
+uint64_t max_buffer_age;
+if (core::Property::StringToTime(max_buffer_age_str, max_buffer_age, unit) 
&& core::Property::ConvertTimeUnitToMS(max_buffer_age, unit, max_buffer_age)) {
+  buffer_.setMaxAge(std::chrono::milliseconds(max_buffer_age));
+  logger_->log_trace("The Buffer maximum age is configured to be %" PRIu64 
" ms", max_buffer_age);
+}
+  }
+
+  std::string max_buffer_size_str;
+  if (context->getProperty(MaxBufferSize.getName(), max_buffer_size_str)) {
+uint64_t max_buffer_size = 
core::DataSizeValue(max_buffer_size_str).getValue();
+if (max_buffer_size > 0) {
+  buffer_.setMaxSize(max_buffer_size);
+  logger_->log_trace("The Buffer maximum size is configured to be %" 
PRIu64 " B", max_buffer_size);
+}
+  }
+
+  context->getProperty(PatternLoc.getName(), pattern_location_);
+
+  std::string pattern_str;
+  if (context->getProperty(Pattern.getName(), pattern_str) && 
!pattern_str.empty()) {
+pattern_ = std::regex(pattern_str);
+logger_->log_trace("The Pattern is configured to be %s", pattern_str);
+  } else {
+throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Pattern property missing or 
invalid");
+  }
+}
+
+void DefragmentText::onTrigger(core::ProcessContext*, core::ProcessSession* 
session) {
+  gsl_Expects(session);
+  auto flowFiles = flow_file_store_.getNewFlowFiles();
+  for (auto& file : flowFiles) {
+processNextFragment(session, file);
+  }
+  std::shared_ptr original_flow_file = session->get();
+  processNextFragment(session, original_flow_file);
+  if (buffer_.maxAgeReached() || buffer_.maxSizeReached()) {
+buffer_.flushAndReplace(session, Failure, nullptr);
+  }
+}
+
+void 

[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1188: MINIFICPP-1651: Added DefragmentText processor

2021-10-26 Thread GitBox


fgerlits commented on a change in pull request #1188:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1188#discussion_r735408467



##
File path: PROCESSORS.md
##
@@ -295,6 +296,29 @@ In the list below, the names of required properties appear 
in bold. Any other pr
 | - | - |
 |success|FlowFiles that are sent successfully to the destination are 
transferred to this relationship|
 
+## DefragmentText
+
+### Description
+
+DefragmentText splits and merges incoming flowfiles so cohesive messages are 
not split between them
+### Properties
+
+In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
+
+| Name | Default Value | Allowable Values | Description |
+| - | - | - | - |
+|**Pattern**|||A regular expression to match at the start or end of messages.|
+|Pattern Location|Start of Message|Start of MessageEnd of Message|Whether 
the pattern is located at the start or at the end of the messages.|
+|MaxBufferAge||duration time unit|The maximum age of a buffer 
after which the buffer will be transferred to failure.|
+|MaxBufferSize||size size unit|The maximum buffer size, if the 
buffer exceed this, it will be transferred to failure.|
+
+### Relationships
+
+| Name | Description |
+| - | - |
+|success|Flowfiles that have no fragmented messages in them|

Review comment:
   "... or have been successfully defragmented"?

##
File path: docker/test/integration/minifi/processors/DefragmentText.py
##
@@ -0,0 +1,9 @@
+from ..core.Processor import Processor
+
+
+class DefragmentText(Processor):
+def __init__(self, delimiter="<[0-9]+>", schedule={'scheduling period': '2 
sec'}):

Review comment:
   I think it would make more sense to make it event driven by default:
   ```suggestion
   def __init__(self, delimiter="<[0-9]+>", schedule={'scheduling 
strategy': 'EVENT_DRIVEN'}):
   ```

##
File path: extensions/standard-processors/processors/DefragmentText.cpp
##
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DefragmentText.h"
+
+#include 
+
+#include "core/Resource.h"
+#include "serialization/PayloadSerializer.h"
+#include "TextFragmentUtils.h"
+#include "utils/gsl.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Relationship DefragmentText::Success("success", "Flowfiles that 
have no fragmented messages in them");
+const core::Relationship DefragmentText::Failure("failure", "Flowfiles that 
failed the defragmentation process");
+const core::Relationship DefragmentText::Self("__self__", "Marks the FlowFile 
to be owned by this processor");
+
+const core::Property DefragmentText::Pattern(
+core::PropertyBuilder::createProperty("Pattern")
+->withDescription("A regular expression to match at the start or end 
of messages.")
+->isRequired(true)->build());
+
+const core::Property DefragmentText::PatternLoc(
+core::PropertyBuilder::createProperty("Pattern 
Location")->withDescription("Whether the pattern is located at the start or at 
the end of the messages.")
+->withAllowableValues(PatternLocation::values())
+
->withDefaultValue(toString(PatternLocation::START_OF_MESSAGE))->build());
+
+
+const core::Property DefragmentText::MaxBufferSize(
+core::PropertyBuilder::createProperty("Max Buffer Size")
+->withDescription("The maximum buffer size, if the buffer exceeds 
this, it will be transferred to failure. Expected format is  ")
+
->withType(core::StandardValidators::get().DATA_SIZE_VALIDATOR)->build());
+
+const core::Property DefragmentText::MaxBufferAge(
+core::PropertyBuilder::createProperty("Max Buffer Age")->
+withDescription("The maximum age of a buffer after which the buffer 
will be transferred to failure. Expected format is  ")->build());
+
+void DefragmentText::initialize() {
+  setSupportedRelationships({Success, Failure});
+  setSupportedProperties({Pattern, PatternLoc, MaxBufferAge, MaxBufferSize});
+}
+
+void DefragmentText::onSchedule(core::ProcessContext* context,