[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1188: MINIFICPP-1651: Added DefragmentText processor
fgerlits commented on a change in pull request #1188: URL: https://github.com/apache/nifi-minifi-cpp/pull/1188#discussion_r739234364 ## File path: extensions/standard-processors/tests/unit/DefragmentTextTests.cpp ## @@ -0,0 +1,273 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "TestBase.h" +#include "WriteToFlowFileTestProcessor.h" +#include "ReadFromFlowFileTestProcessor.h" +#include "UpdateAttribute.h" +#include "DefragmentText.h" +#include "TextFragmentUtils.h" +#include "utils/TestUtils.h" +#include "serialization/PayloadSerializer.h" +#include "serialization/FlowFileSerializer.h" +#include "unit/ContentRepositoryDependentTests.h" + +using WriteToFlowFileTestProcessor = org::apache::nifi::minifi::processors::WriteToFlowFileTestProcessor; +using ReadFromFlowFileTestProcessor = org::apache::nifi::minifi::processors::ReadFromFlowFileTestProcessor; +using UpdateAttribute = org::apache::nifi::minifi::processors::UpdateAttribute; +using DefragmentText = org::apache::nifi::minifi::processors::DefragmentText; + +TEST_CASE("DefragTextFlowFilesNoMultilinePatternAtStartTest", "[defragmenttextnomultilinepatternatstarttest]") { + TestController testController; + std::shared_ptr plan = testController.createPlan(); + std::shared_ptr write_to_flow_file = std::dynamic_pointer_cast( + plan->addProcessor("WriteToFlowFileTestProcessor", "write_to_flow_file")); + std::shared_ptr defrag_text_flow_files = std::dynamic_pointer_cast( + plan->addProcessor("DefragmentText", "defrag_text_flow_files", core::Relationship("success", "description"), true)); + std::shared_ptr read_from_flow_file = std::dynamic_pointer_cast( + plan->addProcessor("ReadFromFlowFileTestProcessor", "read_from_flow_file", DefragmentText::Success, true)); + plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), "<[0-9]+>"); + + + write_to_flow_file->setContent("<1> Foo"); + testController.runSession(plan); + CHECK(read_from_flow_file->numberOfFlowFilesRead() == 0); + write_to_flow_file->setContent("<2> Bar"); + plan->reset(); + testController.runSession(plan); + CHECK(read_from_flow_file->readFlowFileWithContent("<1> Foo")); + write_to_flow_file->setContent("<3> Baz"); + plan->reset(); + testController.runSession(plan); + CHECK(read_from_flow_file->readFlowFileWithContent("<2> Bar")); +} + +TEST_CASE("DefragmentTextEmptyPattern", "[defragmenttextemptypattern]") { + TestController testController; + std::shared_ptr plan = testController.createPlan(); + std::shared_ptr write_to_flow_file = std::dynamic_pointer_cast( + plan->addProcessor("WriteToFlowFileTestProcessor", "write_to_flow_file")); + std::shared_ptr defrag_text_flow_files = std::dynamic_pointer_cast( + plan->addProcessor("DefragmentText", "defrag_text_flow_files", core::Relationship("success", "description"), true)); + std::shared_ptr read_from_flow_file = std::dynamic_pointer_cast( + plan->addProcessor("ReadFromFlowFileTestProcessor", "read_from_flow_file", DefragmentText::Success, true)); + plan->setProperty(defrag_text_flow_files, DefragmentText::Pattern.getName(), ""); + plan->setProperty(defrag_text_flow_files, DefragmentText::PatternLoc.getName(), toString(DefragmentText::PatternLocation::END_OF_MESSAGE)); + + REQUIRE_THROWS_WITH(testController.runSession(plan), "Process Schedule Operation: Pattern property missing or invalid"); +} + +TEST_CASE("DefragmentTextNoMultilinePatternAtEndTest", "[defragmenttextnomultilinepatternatendtest]") { + TestController testController; + std::shared_ptr plan = testController.createPlan(); + std::shared_ptr write_to_flow_file = std::dynamic_pointer_cast( + plan->addProcessor("WriteToFlowFileTestProcessor", "write_to_flow_file")); + std::shared_ptr defrag_text_flow_files = std::dynamic_pointer_cast( + plan->addProcessor("DefragmentText", "defrag_text_flow_files", core::Relationship("success", "description"), true)); + std::shared_ptr read_from_flow_file = std::dynamic_pointer_cast( + plan->addProcessor("ReadFromFlowFileTestProcessor", "read_from_flow_file", DefragmentText::Success, true)); Review comment: Using sections would require
[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1188: MINIFICPP-1651: Added DefragmentText processor
fgerlits commented on a change in pull request #1188: URL: https://github.com/apache/nifi-minifi-cpp/pull/1188#discussion_r739169606 ## File path: extensions/standard-processors/processors/DefragmentText.cpp ## @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "DefragmentText.h" + +#include + +#include "core/Resource.h" +#include "serialization/PayloadSerializer.h" +#include "TextFragmentUtils.h" +#include "utils/gsl.h" +#include "utils/StringUtils.h" + +namespace org::apache::nifi::minifi::processors { + +const core::Relationship DefragmentText::Success("success", "Flowfiles that have been successfully defragmented"); +const core::Relationship DefragmentText::Failure("failure", "Flowfiles that failed the defragmentation process"); +const core::Relationship DefragmentText::Self("__self__", "Marks the FlowFile to be owned by this processor"); + +const core::Property DefragmentText::Pattern( +core::PropertyBuilder::createProperty("Pattern") +->withDescription("A regular expression to match at the start or end of messages.") +->isRequired(true)->build()); + +const core::Property DefragmentText::PatternLoc( +core::PropertyBuilder::createProperty("Pattern Location")->withDescription("Whether the pattern is located at the start or at the end of the messages.") +->withAllowableValues(PatternLocation::values()) + ->withDefaultValue(toString(PatternLocation::START_OF_MESSAGE))->build()); + + +const core::Property DefragmentText::MaxBufferSize( +core::PropertyBuilder::createProperty("Max Buffer Size") +->withDescription("The maximum buffer size, if the buffer exceeds this, it will be transferred to failure. Expected format is ") + ->withType(core::StandardValidators::get().DATA_SIZE_VALIDATOR)->build()); + +const core::Property DefragmentText::MaxBufferAge( +core::PropertyBuilder::createProperty("Max Buffer Age")-> +withDescription("The maximum age of a buffer after which the buffer will be transferred to failure. Expected format is ")->build()); + +void DefragmentText::initialize() { + setSupportedRelationships({Success, Failure}); + setSupportedProperties({Pattern, PatternLoc, MaxBufferAge, MaxBufferSize}); +} + +void DefragmentText::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory*) { + gsl_Expects(context); + + std::string max_buffer_age_str; + if (context->getProperty(MaxBufferAge.getName(), max_buffer_age_str)) { +core::TimeUnit unit; +uint64_t max_buffer_age; +if (core::Property::StringToTime(max_buffer_age_str, max_buffer_age, unit) && core::Property::ConvertTimeUnitToMS(max_buffer_age, unit, max_buffer_age)) { + buffer_.setMaxAge(std::chrono::milliseconds(max_buffer_age)); + logger_->log_trace("The Buffer maximum age is configured to be %" PRIu64 " ms", max_buffer_age); +} + } + + std::string max_buffer_size_str; + if (context->getProperty(MaxBufferSize.getName(), max_buffer_size_str)) { +uint64_t max_buffer_size = core::DataSizeValue(max_buffer_size_str).getValue(); +if (max_buffer_size > 0) { + buffer_.setMaxSize(max_buffer_size); + logger_->log_trace("The Buffer maximum size is configured to be %" PRIu64 " B", max_buffer_size); +} + } + + context->getProperty(PatternLoc.getName(), pattern_location_); + + std::string pattern_str; + if (context->getProperty(Pattern.getName(), pattern_str) && !pattern_str.empty()) { +pattern_ = std::regex(pattern_str); +logger_->log_trace("The Pattern is configured to be %s", pattern_str); + } else { +throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Pattern property missing or invalid"); + } +} + +void DefragmentText::onTrigger(core::ProcessContext*, core::ProcessSession* session) { + gsl_Expects(session); + auto flowFiles = flow_file_store_.getNewFlowFiles(); + for (auto& file : flowFiles) { +processNextFragment(session, file); + } + std::shared_ptr original_flow_file = session->get(); + processNextFragment(session, original_flow_file); + if (buffer_.maxAgeReached() || buffer_.maxSizeReached()) { +buffer_.flushAndReplace(session, Failure, nullptr); + } Review comment:
[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1188: MINIFICPP-1651: Added DefragmentText processor
fgerlits commented on a change in pull request #1188: URL: https://github.com/apache/nifi-minifi-cpp/pull/1188#discussion_r737495323 ## File path: extensions/standard-processors/processors/DefragmentText.cpp ## @@ -0,0 +1,323 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "DefragmentText.h" + +#include + +#include "core/Resource.h" +#include "serialization/PayloadSerializer.h" +#include "TextFragmentUtils.h" +#include "utils/gsl.h" +#include "utils/StringUtils.h" + +namespace org::apache::nifi::minifi::processors { + +const core::Relationship DefragmentText::Success("success", "Flowfiles that have been successfully defragmented"); +const core::Relationship DefragmentText::Failure("failure", "Flowfiles that failed the defragmentation process"); +const core::Relationship DefragmentText::Self("__self__", "Marks the FlowFile to be owned by this processor"); + +const core::Property DefragmentText::Pattern( +core::PropertyBuilder::createProperty("Pattern") +->withDescription("A regular expression to match at the start or end of messages.") +->isRequired(true)->build()); + +const core::Property DefragmentText::PatternLoc( +core::PropertyBuilder::createProperty("Pattern Location")->withDescription("Whether the pattern is located at the start or at the end of the messages.") +->withAllowableValues(PatternLocation::values()) + ->withDefaultValue(toString(PatternLocation::START_OF_MESSAGE))->build()); + + +const core::Property DefragmentText::MaxBufferSize( +core::PropertyBuilder::createProperty("Max Buffer Size") +->withDescription("The maximum buffer size, if the buffer exceeds this, it will be transferred to failure. Expected format is ") + ->withType(core::StandardValidators::get().DATA_SIZE_VALIDATOR)->build()); + +const core::Property DefragmentText::MaxBufferAge( +core::PropertyBuilder::createProperty("Max Buffer Age")-> +withDescription("The maximum age of a buffer after which the buffer will be transferred to failure. Expected format is ")->build()); + +void DefragmentText::initialize() { + setSupportedRelationships({Success, Failure}); + setSupportedProperties({Pattern, PatternLoc, MaxBufferAge, MaxBufferSize}); +} + +void DefragmentText::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory*) { + gsl_Expects(context); + + std::string max_buffer_age_str; + if (context->getProperty(MaxBufferAge.getName(), max_buffer_age_str)) { +core::TimeUnit unit; +uint64_t max_buffer_age; +if (core::Property::StringToTime(max_buffer_age_str, max_buffer_age, unit) && core::Property::ConvertTimeUnitToMS(max_buffer_age, unit, max_buffer_age)) { + buffer_.setMaxAge(std::chrono::milliseconds(max_buffer_age)); + logger_->log_trace("The Buffer maximum age is configured to be %" PRIu64 " ms", max_buffer_age); +} + } + + std::string max_buffer_size_str; + if (context->getProperty(MaxBufferSize.getName(), max_buffer_size_str)) { +uint64_t max_buffer_size = core::DataSizeValue(max_buffer_size_str).getValue(); +if (max_buffer_size > 0) { + buffer_.setMaxSize(max_buffer_size); + logger_->log_trace("The Buffer maximum size is configured to be %" PRIu64 " B", max_buffer_size); +} + } + + context->getProperty(PatternLoc.getName(), pattern_location_); + + std::string pattern_str; + if (context->getProperty(Pattern.getName(), pattern_str) && !pattern_str.empty()) { +pattern_ = std::regex(pattern_str); +logger_->log_trace("The Pattern is configured to be %s", pattern_str); + } else { +throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Pattern property missing or invalid"); + } +} + +void DefragmentText::onTrigger(core::ProcessContext*, core::ProcessSession* session) { + gsl_Expects(session); + auto flowFiles = flow_file_store_.getNewFlowFiles(); + for (auto& file : flowFiles) { +processNextFragment(session, file); + } + std::shared_ptr original_flow_file = session->get(); + processNextFragment(session, original_flow_file); + if (buffer_.maxAgeReached() || buffer_.maxSizeReached()) { +buffer_.flushAndReplace(session, Failure, nullptr); + } +} + +void
[GitHub] [nifi-minifi-cpp] fgerlits commented on a change in pull request #1188: MINIFICPP-1651: Added DefragmentText processor
fgerlits commented on a change in pull request #1188: URL: https://github.com/apache/nifi-minifi-cpp/pull/1188#discussion_r735408467 ## File path: PROCESSORS.md ## @@ -295,6 +296,29 @@ In the list below, the names of required properties appear in bold. Any other pr | - | - | |success|FlowFiles that are sent successfully to the destination are transferred to this relationship| +## DefragmentText + +### Description + +DefragmentText splits and merges incoming flowfiles so cohesive messages are not split between them +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +| - | - | - | - | +|**Pattern**|||A regular expression to match at the start or end of messages.| +|Pattern Location|Start of Message|Start of MessageEnd of Message|Whether the pattern is located at the start or at the end of the messages.| +|MaxBufferAge||duration time unit|The maximum age of a buffer after which the buffer will be transferred to failure.| +|MaxBufferSize||size size unit|The maximum buffer size, if the buffer exceed this, it will be transferred to failure.| + +### Relationships + +| Name | Description | +| - | - | +|success|Flowfiles that have no fragmented messages in them| Review comment: "... or have been successfully defragmented"? ## File path: docker/test/integration/minifi/processors/DefragmentText.py ## @@ -0,0 +1,9 @@ +from ..core.Processor import Processor + + +class DefragmentText(Processor): +def __init__(self, delimiter="<[0-9]+>", schedule={'scheduling period': '2 sec'}): Review comment: I think it would make more sense to make it event driven by default: ```suggestion def __init__(self, delimiter="<[0-9]+>", schedule={'scheduling strategy': 'EVENT_DRIVEN'}): ``` ## File path: extensions/standard-processors/processors/DefragmentText.cpp ## @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "DefragmentText.h" + +#include + +#include "core/Resource.h" +#include "serialization/PayloadSerializer.h" +#include "TextFragmentUtils.h" +#include "utils/gsl.h" + +namespace org::apache::nifi::minifi::processors { + +const core::Relationship DefragmentText::Success("success", "Flowfiles that have no fragmented messages in them"); +const core::Relationship DefragmentText::Failure("failure", "Flowfiles that failed the defragmentation process"); +const core::Relationship DefragmentText::Self("__self__", "Marks the FlowFile to be owned by this processor"); + +const core::Property DefragmentText::Pattern( +core::PropertyBuilder::createProperty("Pattern") +->withDescription("A regular expression to match at the start or end of messages.") +->isRequired(true)->build()); + +const core::Property DefragmentText::PatternLoc( +core::PropertyBuilder::createProperty("Pattern Location")->withDescription("Whether the pattern is located at the start or at the end of the messages.") +->withAllowableValues(PatternLocation::values()) + ->withDefaultValue(toString(PatternLocation::START_OF_MESSAGE))->build()); + + +const core::Property DefragmentText::MaxBufferSize( +core::PropertyBuilder::createProperty("Max Buffer Size") +->withDescription("The maximum buffer size, if the buffer exceeds this, it will be transferred to failure. Expected format is ") + ->withType(core::StandardValidators::get().DATA_SIZE_VALIDATOR)->build()); + +const core::Property DefragmentText::MaxBufferAge( +core::PropertyBuilder::createProperty("Max Buffer Age")-> +withDescription("The maximum age of a buffer after which the buffer will be transferred to failure. Expected format is ")->build()); + +void DefragmentText::initialize() { + setSupportedRelationships({Success, Failure}); + setSupportedProperties({Pattern, PatternLoc, MaxBufferAge, MaxBufferSize}); +} + +void DefragmentText::onSchedule(core::ProcessContext* context,