nifi git commit: NIFI-5761 ReplaceText processor can stop processing data if it evaluates invalid expressions
Repository: nifi Updated Branches: refs/heads/master fdb1fd1a6 -> 1f2cf4bc6 NIFI-5761 ReplaceText processor can stop processing data if it evaluates invalid expressions NIFI-5761 Code review. Remove startsWidth to check an exception. Added the dependency as provided. NIFI-5761 Code review. Remove provided. NIFI-5761 Code review. Improve logging. This closes #3112. Signed-off-by: Koji Kawamura Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1f2cf4bc Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1f2cf4bc Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1f2cf4bc Branch: refs/heads/master Commit: 1f2cf4bc6128e2a09dcc378bdf66b46302d44754 Parents: fdb1fd1 Author: Gardella Juan Pablo Authored: Fri Oct 26 16:41:26 2018 -0300 Committer: Koji Kawamura Committed: Wed Oct 31 12:01:16 2018 +0900 -- .../nifi-standard-processors/pom.xml| 4 .../nifi/processors/standard/ReplaceText.java | 10 +--- .../processors/standard/TestReplaceText.java| 25 nifi-nar-bundles/nifi-standard-bundle/pom.xml | 5 4 files changed, 41 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/nifi/blob/1f2cf4bc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml -- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index ed86dfd..620c570 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -26,6 +26,10 @@ org.apache.nifi +nifi-expression-language + + +org.apache.nifi nifi-processor-utils 1.9.0-SNAPSHOT http://git-wip-us.apache.org/repos/asf/nifi/blob/1f2cf4bc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java -- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java index 2a9a6fe..3108a6c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java @@ -26,6 +26,8 @@ import org.apache.nifi.annotation.behavior.SystemResource; import org.apache.nifi.annotation.behavior.SystemResourceConsideration; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException; +import org.apache.nifi.attribute.expression.language.exception.IllegalAttributeException; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -297,17 +299,19 @@ public class ReplaceText extends AbstractProcessor { } catch (StackOverflowError e) { // Some regular expressions can produce many matches on large input data size using recursive code // do not log the StackOverflowError stack trace -logger.info("Transferred {} to 'failure' due to {}", new Object[] {flowFile, e.toString()}); +logger.info("Transferred {} to 'failure' due to {}", new Object[] { flowFile, e.toString() }); +session.transfer(flowFile, REL_FAILURE); +return; +} catch (IllegalAttributeException | AttributeExpressionLanguageException e) { +logger.warn("Transferred {} to 'failure' due to {}", new Object[] { flowFile, e.toString() }, e); session.transfer(flowFile, REL_FAILURE); return; } - logger.info("Transferred {} to 'success'", new Object[] {flowFile}); session.getProvenanceReporter().modifyContent(flowFile, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); session.transfer(flowFile, REL_SUCCESS); } - // If we find a back reference that is not valid, then we will treat it as a literal string. For example, if we have 3 capturing // groups and the Replacement Value has the value is "I owe $8 to him", then we want to treat the $8 as a literal "$8", rather // than
nifi-minifi-cpp git commit: MINIFICPP-640 - C API: how to support dynamic properties?
Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 2fb4021e1 -> fc1074a04 MINIFICPP-640 - C API: how to support dynamic properties? This closes #430. Signed-off-by: Marc Parisi Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/fc1074a0 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/fc1074a0 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/fc1074a0 Branch: refs/heads/master Commit: fc1074a0495da8947fa8f6fa8a0fb258da2c13b3 Parents: 2fb4021 Author: Arpad Boda Authored: Tue Oct 30 14:48:04 2018 +0100 Committer: Marc Parisi Committed: Tue Oct 30 15:09:31 2018 -0400 -- libminifi/src/capi/api.cpp| 3 ++- libminifi/test/capi/CAPITests.cpp | 11 --- 2 files changed, 10 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fc1074a0/libminifi/src/capi/api.cpp -- diff --git a/libminifi/src/capi/api.cpp b/libminifi/src/capi/api.cpp index 58328ef..e135fe1 100644 --- a/libminifi/src/capi/api.cpp +++ b/libminifi/src/capi/api.cpp @@ -412,7 +412,8 @@ int set_failure_strategy(flow *flow, FailureStrategy strategy) { int set_property(processor *proc, const char *name, const char *value) { if (name != nullptr && value != nullptr && proc != nullptr) { core::Processor *p = static_cast(proc->processor_ptr); -return p->setProperty(name, value) ? 0 : -2; +bool success = p->setProperty(name, value) || (p->supportsDynamicProperties() && p->setDynamicProperty(name, value)); +return success ? 0 : -2; } return -1; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/fc1074a0/libminifi/test/capi/CAPITests.cpp -- diff --git a/libminifi/test/capi/CAPITests.cpp b/libminifi/test/capi/CAPITests.cpp index 368c9a1..b7bf784 100644 --- a/libminifi/test/capi/CAPITests.cpp +++ b/libminifi/test/capi/CAPITests.cpp @@ -165,10 +165,10 @@ TEST_CASE("Test manipulation of attributes", "[testAttributes]") { processor *extract_test = add_processor_with_linkage(test_flow, "ExtractText"); REQUIRE(extract_test != nullptr); REQUIRE(set_property(extract_test, "Attribute", "TestAttr") == 0); - /*processor *update_attribute = add_processor_with_linkage(test_flow, "UpdateAttribute"); - REQUIRE(update_attribute != nullptr); + processor *update_attr = add_processor_with_linkage(test_flow, "UpdateAttribute"); + REQUIRE(update_attr != nullptr); - REQUIRE(set_property(update_attribute, "TestAttribute", "TestValue") == 0);*/ + REQUIRE(set_property(update_attr, "UpdatedAttribute", "UpdatedValue") == 0); flow_file_record *record = get_next_flow_file(instance, test_flow); @@ -203,12 +203,17 @@ TEST_CASE("Test manipulation of attributes", "[testAttributes]") { REQUIRE(get_all_attributes(record, _set) == attr_set.size); bool test_attr_found = false; + bool updated_attr_found = false; for (int i = 0; i < attr_set.size; ++i) { if (strcmp(attr_set.attributes[i].key, test_attr.key) == 0) { test_attr_found = true; REQUIRE(std::string(static_cast(attr_set.attributes[i].value), attr_set.attributes[i].value_size) == new_testattr_value); +} else if (strcmp(attr_set.attributes[i].key, "UpdatedAttribute") == 0) { + updated_attr_found = true; + REQUIRE(std::string(static_cast(attr_set.attributes[i].value), attr_set.attributes[i].value_size) == "UpdatedValue"); } } + REQUIRE(updated_attr_found == true); REQUIRE(test_attr_found == true); free_flowfile(record);
nifi git commit: NIFI-5766 Make formatting in User Guide consistent with Admin Guide
Repository: nifi Updated Branches: refs/heads/master db966cf34 -> fdb1fd1a6 NIFI-5766 Make formatting in User Guide consistent with Admin Guide This closes #3115 Signed-off-by: Scott Aslan Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fdb1fd1a Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fdb1fd1a Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fdb1fd1a Branch: refs/heads/master Commit: fdb1fd1a64d85caefb6be328d1e169c57b5a9777 Parents: db966cf Author: Andrew Lim Authored: Tue Oct 30 11:46:32 2018 -0400 Committer: Scott Aslan Committed: Tue Oct 30 12:04:29 2018 -0400 -- .../src/main/asciidoc/images/iconDetails.png| Bin 362 -> 704 bytes .../asciidoc/images/iconDownloadTemplate.png| Bin 0 -> 929 bytes nifi-docs/src/main/asciidoc/user-guide.adoc | 117 +-- 3 files changed, 57 insertions(+), 60 deletions(-) -- http://git-wip-us.apache.org/repos/asf/nifi/blob/fdb1fd1a/nifi-docs/src/main/asciidoc/images/iconDetails.png -- diff --git a/nifi-docs/src/main/asciidoc/images/iconDetails.png b/nifi-docs/src/main/asciidoc/images/iconDetails.png index f4fb4d4..28166de 100644 Binary files a/nifi-docs/src/main/asciidoc/images/iconDetails.png and b/nifi-docs/src/main/asciidoc/images/iconDetails.png differ http://git-wip-us.apache.org/repos/asf/nifi/blob/fdb1fd1a/nifi-docs/src/main/asciidoc/images/iconDownloadTemplate.png -- diff --git a/nifi-docs/src/main/asciidoc/images/iconDownloadTemplate.png b/nifi-docs/src/main/asciidoc/images/iconDownloadTemplate.png new file mode 100644 index 000..1bfc1cf Binary files /dev/null and b/nifi-docs/src/main/asciidoc/images/iconDownloadTemplate.png differ http://git-wip-us.apache.org/repos/asf/nifi/blob/fdb1fd1a/nifi-docs/src/main/asciidoc/user-guide.adoc -- diff --git a/nifi-docs/src/main/asciidoc/user-guide.adoc b/nifi-docs/src/main/asciidoc/user-guide.adoc index 09e22b9..c69b771 100644 --- a/nifi-docs/src/main/asciidoc/user-guide.adoc +++ b/nifi-docs/src/main/asciidoc/user-guide.adoc @@ -119,13 +119,13 @@ UI may become unavailable. As a result, several components may be combined together to make a larger building block from which to create a dataflow. These templates can also be exported as XML and imported into another NiFi instance, allowing these building blocks to be shared. -*flow.xml.gz*: Everything the DFM puts onto the NiFi User Interface canvas is written, in real time, to one file called the flow.xml.gz. This file is located in the nifi/conf directory by default. - Any change made on the canvas is automatically saved to this file, without the user needing to click a "save" button. +*flow.xml.gz*: Everything the DFM puts onto the NiFi User Interface canvas is written, in real time, to one file called the _flow.xml.gz_. This file is located in the `nifi/conf` directory by default. + Any change made on the canvas is automatically saved to this file, without the user needing to click a "Save" button. In addition, NiFi automatically creates a backup copy of this file in the archive directory when it is updated. - You can use these archived files to rollback flow configuration. To do so, stop NiFi, replace flow.xml.gz with a desired backup copy, then restart NiFi. - In a clustered environment, stop the entire NiFi cluster, replace the flow.xml.gz of one of nodes, and restart the node. Remove flow.xml.gz from other nodes. + You can use these archived files to rollback flow configuration. To do so, stop NiFi, replace _flow.xml.gz_ with a desired backup copy, then restart NiFi. + In a clustered environment, stop the entire NiFi cluster, replace the _flow.xml.gz_ of one of nodes, and restart the node. Remove _flow.xml.gz_ from other nodes. Once you confirmed the node starts up as a one-node cluster, start the other nodes. The replaced flow configuration will be synchronized across the cluster. - The name and location of flow.xml.gz, and auto archive behavior are configurable. See the link:administration-guide.html#core-properties-br[System Administratorâs Guide] for further details. + The name and location of _flow.xml.gz_, and auto archive behavior are configurable. See the link:administration-guide.html#core-properties-br[System Administratorâs Guide] for further details. @@ -441,7 +441,7 @@ image:iconLabel.png["Label"] *Label*: Labels are used to provide documentation to parts of a dataflow. When a Label is dropped onto the canvas, it is created with a default size. The Label can then be
nifi-minifi-cpp git commit: MINIFICPP-654 - C API: failure callback improvements
Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 0043155dc -> 2fb4021e1 MINIFICPP-654 - C API: failure callback improvements This closes #429. Signed-off-by: Marc Parisi Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/2fb4021e Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/2fb4021e Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/2fb4021e Branch: refs/heads/master Commit: 2fb4021e1b93eaa7ae491a13f88a17e466475a86 Parents: 0043155 Author: Arpad Boda Authored: Wed Oct 24 15:14:05 2018 +0200 Committer: Marc Parisi Committed: Tue Oct 30 07:29:30 2018 -0400 -- libminifi/include/capi/Plan.h | 77 ++ libminifi/include/capi/api.h | 13 +- libminifi/include/capi/cstructs.h | 6 ++- libminifi/src/capi/Plan.cpp | 31 -- libminifi/src/capi/api.cpp| 49 +++--- libminifi/test/capi/CAPITests.cpp | 12 -- 6 files changed, 129 insertions(+), 59 deletions(-) -- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2fb4021e/libminifi/include/capi/Plan.h -- diff --git a/libminifi/include/capi/Plan.h b/libminifi/include/capi/Plan.h index 08ad68a..75330a0 100644 --- a/libminifi/include/capi/Plan.h +++ b/libminifi/include/capi/Plan.h @@ -46,6 +46,41 @@ #include "capi/cstructs.h" #include "capi/api.h" +using failure_callback_type = std::function; +using content_repo_sptr = std::shared_ptr; + +namespace { + + void failureStrategyAsIs(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) { +auto ff = session->get(); +if (ff == nullptr) { + return; +} + +auto claim = ff->getResourceClaim(); + +if (claim != nullptr && user_callback != nullptr) { + claim->increaseFlowFileRecordOwnedCount(); + // create a flow file. + auto path = claim->getContentFullPath(); + auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize()); + ffr->attributes = ff->getAttributesPtr(); + ffr->ffp = ff.get(); + auto content_repo_ptr = static_cast*>(ffr->crp); + *content_repo_ptr = cr_ptr; + user_callback(ffr); +} +session->remove(ff); + } + + void failureStrategyRollback(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) { +session->rollback(); +failureStrategyAsIs(session, user_callback, cr_ptr); + } +} + +static const std::map> FailureStrategies = +{ { FailureStrategy::AS_IS, failureStrategyAsIs }, {FailureStrategy::ROLLBACK, failureStrategyRollback } }; class ExecutionPlan { public: @@ -67,7 +102,9 @@ class ExecutionPlan { bool runNextProcessor(std::function, const std::shared_ptr)> verify = nullptr); - bool setFailureCallback(void (*onerror_callback)(const flow_file_record*)); + bool setFailureCallback(failure_callback_type onerror_callback); + + bool setFailureStrategy(FailureStrategy start); std::set getProvenanceRecords(); @@ -100,37 +137,25 @@ class ExecutionPlan { protected: class FailureHandler { public: -FailureHandler() { +FailureHandler(content_repo_sptr cr_ptr) { callback_ = nullptr; + strategy_ = FailureStrategy::AS_IS; + content_repo_ = cr_ptr; } -void setCallback(void (*onerror_callback)(const flow_file_record*)) { +void setCallback(failure_callback_type onerror_callback) { callback_=onerror_callback; } -void operator()(const processor_session* ps) -{ +void setStrategy(FailureStrategy strat) { + strategy_ = strat; +} +void operator()(const processor_session* ps) { auto ses = static_cast(ps->session); - - auto ff = ses->get(); - if (ff == nullptr) { -return; - } - auto claim = ff->getResourceClaim(); - - if (claim != nullptr && callback_ != nullptr) { -// create a flow file. -auto path = claim->getContentFullPath(); -auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize()); -ffr->attributes = ff->getAttributesPtr(); -ffr->ffp = ff.get(); -callback_(ffr); - } - // This deletes the content of the flowfile as ff gets out of scope - // It's the users responsibility to copy all the data - ses->remove(ff); - + FailureStrategies.at(strategy_)(ses, callback_, content_repo_); } private: -void (*callback_)(const flow_file_record*); +failure_callback_type callback_; +FailureStrategy strategy_; +content_repo_sptr content_repo_; }; void finalize(); @@ -142,7 +167,7 @@ class ExecutionPlan { std::shared_ptr stream_factory; -