[nifi] branch master updated: NIFI-6271, fix issue that incoming flowfile attributes don't copy into output flowfiles when Output Batch Size is set
This is an automated email from the ASF dual-hosted git repository. ijokarumawak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git The following commit(s) were added to refs/heads/master by this push: new fa1ed16 NIFI-6271, fix issue that incoming flowfile attributes don't copy into output flowfiles when Output Batch Size is set fa1ed16 is described below commit fa1ed16e2bfc389466941e3bfe36a7a6ff08403b Author: avseq1234 AuthorDate: Sun Jul 7 21:26:16 2019 +0800 NIFI-6271, fix issue that incoming flowfile attributes don't copy into output flowfiles when Output Batch Size is set NIFI-6271, fix incoming flowfile attributes don't copy into output flowfiles when Output Batch Size is set NIFI-6271, fix incoming flowfile attributes don't copy into output flowfiles when Output Batch Size is set replace getAttribute(uuid) with getAttribute(CoreAttributes.UUID.key() fix checkstyle violation This closes #3575. Signed-off-by: Koji Kawamura --- .../nifi/processors/standard/AbstractExecuteSQL.java | 12 +++- .../apache/nifi/processors/standard/TestExecuteSQL.java| 14 -- .../nifi/processors/standard/TestExecuteSQLRecord.java | 9 - 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java index 212febc..700e92e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractExecuteSQL.java @@ -62,6 +62,7 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { public static final String RESULT_QUERY_FETCH_TIME = "executesql.query.fetchtime"; public static final String RESULTSET_INDEX = "executesql.resultset.index"; public static final String RESULT_ERROR_MESSAGE = "executesql.error.message"; +public static final String INPUT_FLOWFILE_UUID = "input.flowfile.uuid"; public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key(); @@ -247,6 +248,8 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { boolean hasUpdateCount = st.getUpdateCount() != -1; +Map inputFileAttrMap = fileToProcess == null ? null : fileToProcess.getAttributes(); +String inputFileUUID = fileToProcess == null ? null : fileToProcess.getAttribute(CoreAttributes.UUID.key()); while (hasResults || hasUpdateCount) { //getMoreResults() and execute() return false to indicate that the result of the statement is just a number and not a ResultSet if (hasResults) { @@ -262,9 +265,13 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { resultSetFF = session.create(); } else { resultSetFF = session.create(fileToProcess); -resultSetFF = session.putAllAttributes(resultSetFF, fileToProcess.getAttributes()); } +if (inputFileAttrMap != null) { +resultSetFF = session.putAllAttributes(resultSetFF, inputFileAttrMap); +} + + try { resultSetFF = session.write(resultSetFF, out -> { try { @@ -283,6 +290,9 @@ public abstract class AbstractExecuteSQL extends AbstractProcessor { attributesToAdd.put(RESULT_QUERY_EXECUTION_TIME, String.valueOf(executionTimeElapsed)); attributesToAdd.put(RESULT_QUERY_FETCH_TIME, String.valueOf(fetchTimeElapsed)); attributesToAdd.put(RESULTSET_INDEX, String.valueOf(resultCount)); +if (inputFileUUID != null) { +attributesToAdd.put(INPUT_FLOWFILE_UUID, inputFileUUID); +} attributesToAdd.putAll(sqlWriter.getAttributesToAdd()); resultSetFF = session.putAllAttributes(resultSetFF, attributesToAdd); sqlWriter.updateCounters(session); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteSQL.java
[jira] [Commented] (MINIFI-504) Upgrade commons-deamon to 1.2.0 to make build green again
[ https://issues.apache.org/jira/browse/MINIFI-504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881418#comment-16881418 ] Aldrin Piri commented on MINIFI-504: Hey [~shonzilla]! No problem, we are happy to have you and I will always prefer over-reporting as opposed to not reporting :) I left you a note on your PR https://github.com/apache/nifi-minifi/pull/161#issuecomment-509722142 but had opened a similar PR last evening. Let me know your thoughts and we can get this wrapped up. I will scope out your other PR soon! Thank you! > Upgrade commons-deamon to 1.2.0 to make build green again > - > > Key: MINIFI-504 > URL: https://issues.apache.org/jira/browse/MINIFI-504 > Project: Apache NiFi MiNiFi > Issue Type: Bug > Components: Build >Affects Versions: 0.6.0 >Reporter: Nenad Nikolic >Priority: Blocker > Fix For: 0.6.0 > > Time Spent: 1h > Remaining Estimate: 0h > > After submitting the [PR #158|https://github.com/apache/nifi-minifi/pull/158] > for a broken test (and underlying class) I realized the build breaks for an > unrelated reason. The reason was {{commons-daemon}} version 1.1.0 being used > which is not available on the > [mirror|https://apache.claz.org/commons/daemon/binaries/windows/] anymore. > Currently, there's only one, newer version 1.2.0 hosted on the mirror that > should be used instead. > Because of this issue with the build system it's not possible to submit any > correct PRs at the moment, so I've set the priority to Blocker. > This Jira issue is FYI as another PR to upgrade {{commons-deamon}} to 1.2.0 > is coming right up. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (MINIFI-504) Upgrade commons-deamon to 1.2.0 to make build green again
[ https://issues.apache.org/jira/browse/MINIFI-504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881398#comment-16881398 ] Nenad Nikolic commented on MINIFI-504: -- [~aldrin] thanks for responding so quickly! I've submitted PR [#161|https://github.com/apache/nifi-minifi/pull/161] which makes the build green again. Granted, it took two failed attempts to submit the correct PR but in the end it worked out. :P I'd like to contribute, so I hope you can this PR to master. After this hopefully happens, I will rebase and resubmit my fix for MINIFI-478. p.s. Only after finishing the work I searched MINIFI Jira project again to realize that there were already not one but two issues related to commons-daemon upgrade - MINIFI-502 and MINIFI-503. Sorry for not realizing this before and for creating another issue which is now linked to my green PR. :) > Upgrade commons-deamon to 1.2.0 to make build green again > - > > Key: MINIFI-504 > URL: https://issues.apache.org/jira/browse/MINIFI-504 > Project: Apache NiFi MiNiFi > Issue Type: Bug > Components: Build >Affects Versions: 0.6.0 >Reporter: Nenad Nikolic >Priority: Blocker > Fix For: 0.6.0 > > Time Spent: 1h > Remaining Estimate: 0h > > After submitting the [PR #158|https://github.com/apache/nifi-minifi/pull/158] > for a broken test (and underlying class) I realized the build breaks for an > unrelated reason. The reason was {{commons-daemon}} version 1.1.0 being used > which is not available on the > [mirror|https://apache.claz.org/commons/daemon/binaries/windows/] anymore. > Currently, there's only one, newer version 1.2.0 hosted on the mirror that > should be used instead. > Because of this issue with the build system it's not possible to submit any > correct PRs at the moment, so I've set the priority to Blocker. > This Jira issue is FYI as another PR to upgrade {{commons-deamon}} to 1.2.0 > is coming right up. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[nifi] branch master updated: Removed guava test dependency as it doesn't look like any of the test code requires it.
This is an automated email from the ASF dual-hosted git repository. mthomsen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git The following commit(s) were added to refs/heads/master by this push: new 95f5b22 Removed guava test dependency as it doesn't look like any of the test code requires it. 95f5b22 is described below commit 95f5b2278ca44b0a6d1be208cf404fae102c81e0 Author: thenatog AuthorDate: Mon Jun 10 14:33:37 2019 -0400 Removed guava test dependency as it doesn't look like any of the test code requires it. NIFI-5562 - Removed unnecessary usage of Guava NIFI-5562 - Updated Guava version to 28.0-jre NIFI-5562 - Upgraded Guava to 28.0-jre and Curator to 4.2.0 This closes #3577 Signed-off-by: Mike Thomsen --- .../nifi-datadog-reporting-task/pom.xml| 2 +- .../nifi-enrich-processors/pom.xml | 2 +- .../nifi-evtx-bundle/nifi-evtx-processors/pom.xml | 2 +- nifi-nar-bundles/nifi-framework-bundle/pom.xml | 18 +--- .../nifi-ignite-processors/pom.xml | 6 -- .../nifi-influxdb-processors/pom.xml | 2 +- .../nifi-kite-bundle/nifi-kite-processors/pom.xml | 2 +- .../nifi-kudu-bundle/nifi-kudu-processors/pom.xml | 6 -- .../org/apache/nifi/processors/kudu/PutKudu.java | 2 -- .../nifi-mongodb-processors/pom.xml| 2 +- .../nifi-rethinkdb-processors/pom.xml | 6 -- .../processors/rethinkdb/TestDeleteRethinkDB.java | 25 +++--- nifi-nar-bundles/nifi-standard-bundle/pom.xml | 2 +- nifi-toolkit/nifi-toolkit-admin/pom.xml| 2 +- .../nifi-toolkit-zookeeper-migrator/pom.xml| 2 +- 15 files changed, 36 insertions(+), 45 deletions(-) diff --git a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/pom.xml b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/pom.xml index 90f4eaf..9d1558e 100644 --- a/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/pom.xml +++ b/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/pom.xml @@ -61,7 +61,7 @@ com.google.guava guava -19.0 +28.0-jre diff --git a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml index a6e027a..2df2606 100644 --- a/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml +++ b/nifi-nar-bundles/nifi-enrich-bundle/nifi-enrich-processors/pom.xml @@ -68,7 +68,7 @@ com.google.guava guava -18.0 +28.0-jre org.powermock diff --git a/nifi-nar-bundles/nifi-evtx-bundle/nifi-evtx-processors/pom.xml b/nifi-nar-bundles/nifi-evtx-bundle/nifi-evtx-processors/pom.xml index daae745..a625fa7 100644 --- a/nifi-nar-bundles/nifi-evtx-bundle/nifi-evtx-processors/pom.xml +++ b/nifi-nar-bundles/nifi-evtx-bundle/nifi-evtx-processors/pom.xml @@ -45,7 +45,7 @@ com.google.guava guava -18.0 +28.0-jre commons-io diff --git a/nifi-nar-bundles/nifi-framework-bundle/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/pom.xml index e2366ac..825ebd5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/pom.xml @@ -320,7 +320,7 @@ com.google.guava guava -18.0 +28.0-jre @@ -592,12 +592,24 @@ org.apache.curator curator-framework -2.11.0 +4.2.0 + + +org.apache.zookeeper +zookeeper + + org.apache.curator curator-recipes -2.11.0 +4.2.0 + + +org.apache.zookeeper +zookeeper + + org.apache.commons diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/pom.xml b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/pom.xml index 71515ea..77ddb64 100644 --- a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/pom.xml +++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/pom.xml @@ -77,11 +77,5 @@ 1.10.0-SNAPSHOT test - -com.google.guava -guava -26.0-jre -test - diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/pom.xml
[jira] [Commented] (MINIFI-504) Upgrade commons-deamon to 1.2.0 to make build green again
[ https://issues.apache.org/jira/browse/MINIFI-504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881315#comment-16881315 ] Aldrin Piri commented on MINIFI-504: [~shonzilla] Thanks for submitting this. The release happened quite recently and did not go detected as early thanks to the US holiday. Have some logistics to work out but should be taken care of shortly. > Upgrade commons-deamon to 1.2.0 to make build green again > - > > Key: MINIFI-504 > URL: https://issues.apache.org/jira/browse/MINIFI-504 > Project: Apache NiFi MiNiFi > Issue Type: Bug > Components: Build >Affects Versions: 0.6.0 >Reporter: Nenad Nikolic >Priority: Blocker > Fix For: 0.6.0 > > Time Spent: 10m > Remaining Estimate: 0h > > After submitting the [PR #158|https://github.com/apache/nifi-minifi/pull/158] > for a broken test (and underlying class) I realized the build breaks for an > unrelated reason. The reason was {{commons-daemon}} version 1.1.0 being used > which is not available on the > [mirror|https://apache.claz.org/commons/daemon/binaries/windows/] anymore. > Currently, there's only one, newer version 1.2.0 hosted on the mirror that > should be used instead. > Because of this issue with the build system it's not possible to submit any > correct PRs at the moment, so I've set the priority to Blocker. > This Jira issue is FYI as another PR to upgrade {{commons-deamon}} to 1.2.0 > is coming right up. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (MINIFI-504) Upgrade commons-deamon to 1.2.0 to make build green again
[ https://issues.apache.org/jira/browse/MINIFI-504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aldrin Piri resolved MINIFI-504. Resolution: Duplicate > Upgrade commons-deamon to 1.2.0 to make build green again > - > > Key: MINIFI-504 > URL: https://issues.apache.org/jira/browse/MINIFI-504 > Project: Apache NiFi MiNiFi > Issue Type: Bug > Components: Build >Affects Versions: 0.6.0 >Reporter: Nenad Nikolic >Priority: Blocker > Fix For: 0.6.0 > > Time Spent: 10m > Remaining Estimate: 0h > > After submitting the [PR #158|https://github.com/apache/nifi-minifi/pull/158] > for a broken test (and underlying class) I realized the build breaks for an > unrelated reason. The reason was {{commons-daemon}} version 1.1.0 being used > which is not available on the > [mirror|https://apache.claz.org/commons/daemon/binaries/windows/] anymore. > Currently, there's only one, newer version 1.2.0 hosted on the mirror that > should be used instead. > Because of this issue with the build system it's not possible to submit any > correct PRs at the moment, so I've set the priority to Blocker. > This Jira issue is FYI as another PR to upgrade {{commons-deamon}} to 1.2.0 > is coming right up. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (MINIFI-504) Upgrade commons-deamon to 1.2.0 to make build green again
[ https://issues.apache.org/jira/browse/MINIFI-504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nenad Nikolic updated MINIFI-504: - Description: After submitting the [PR #158|https://github.com/apache/nifi-minifi/pull/158] for a broken test (and underlying class) I realized the build breaks for an unrelated reason. The reason was {{commons-daemon}} version 1.1.0 being used which is not available on the [mirror|https://apache.claz.org/commons/daemon/binaries/windows/] anymore. Currently, there's only one, newer version 1.2.0 hosted on the mirror that should be used instead. Because of this issue with the build system it's not possible to submit any correct PRs at the moment so I've set the priority to Blocker. This Jira issue is FYI as another PR to upgrade {{commons-deamon}} to 1.2.0 is coming right up. was: Upon submitting the [PR #158|https://github.com/apache/nifi-minifi/pull/158] for a broken test (and underlying class) I realized that commons-daemon version 1.1.0 used at the moment is not available on the mirror anymore. Currently, there's only one, newer version 1.2.0 hosted at the moment. Because of this issue with the build system it's not possible to submit any correct PRs at the moment so I've set the priority to Blocker. This Jira issue is FYI as another PR to upgrade commons-deamon to 1.2.0 is coming right up. > Upgrade commons-deamon to 1.2.0 to make build green again > - > > Key: MINIFI-504 > URL: https://issues.apache.org/jira/browse/MINIFI-504 > Project: Apache NiFi MiNiFi > Issue Type: Bug > Components: Build >Affects Versions: 0.6.0 >Reporter: Nenad Nikolic >Priority: Blocker > Fix For: 0.6.0 > > > After submitting the [PR #158|https://github.com/apache/nifi-minifi/pull/158] > for a broken test (and underlying class) I realized the build breaks for an > unrelated reason. The reason was {{commons-daemon}} version 1.1.0 being used > which is not available on the > [mirror|https://apache.claz.org/commons/daemon/binaries/windows/] anymore. > Currently, there's only one, newer version 1.2.0 hosted on the mirror that > should be used instead. > Because of this issue with the build system it's not possible to submit any > correct PRs at the moment so I've set the priority to Blocker. > This Jira issue is FYI as another PR to upgrade {{commons-deamon}} to 1.2.0 > is coming right up. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (MINIFI-504) Upgrade commons-deamon to 1.2.0 to make build green again
[ https://issues.apache.org/jira/browse/MINIFI-504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nenad Nikolic updated MINIFI-504: - Description: After submitting the [PR #158|https://github.com/apache/nifi-minifi/pull/158] for a broken test (and underlying class) I realized the build breaks for an unrelated reason. The reason was {{commons-daemon}} version 1.1.0 being used which is not available on the [mirror|https://apache.claz.org/commons/daemon/binaries/windows/] anymore. Currently, there's only one, newer version 1.2.0 hosted on the mirror that should be used instead. Because of this issue with the build system it's not possible to submit any correct PRs at the moment, so I've set the priority to Blocker. This Jira issue is FYI as another PR to upgrade {{commons-deamon}} to 1.2.0 is coming right up. was: After submitting the [PR #158|https://github.com/apache/nifi-minifi/pull/158] for a broken test (and underlying class) I realized the build breaks for an unrelated reason. The reason was {{commons-daemon}} version 1.1.0 being used which is not available on the [mirror|https://apache.claz.org/commons/daemon/binaries/windows/] anymore. Currently, there's only one, newer version 1.2.0 hosted on the mirror that should be used instead. Because of this issue with the build system it's not possible to submit any correct PRs at the moment so I've set the priority to Blocker. This Jira issue is FYI as another PR to upgrade {{commons-deamon}} to 1.2.0 is coming right up. > Upgrade commons-deamon to 1.2.0 to make build green again > - > > Key: MINIFI-504 > URL: https://issues.apache.org/jira/browse/MINIFI-504 > Project: Apache NiFi MiNiFi > Issue Type: Bug > Components: Build >Affects Versions: 0.6.0 >Reporter: Nenad Nikolic >Priority: Blocker > Fix For: 0.6.0 > > > After submitting the [PR #158|https://github.com/apache/nifi-minifi/pull/158] > for a broken test (and underlying class) I realized the build breaks for an > unrelated reason. The reason was {{commons-daemon}} version 1.1.0 being used > which is not available on the > [mirror|https://apache.claz.org/commons/daemon/binaries/windows/] anymore. > Currently, there's only one, newer version 1.2.0 hosted on the mirror that > should be used instead. > Because of this issue with the build system it's not possible to submit any > correct PRs at the moment, so I've set the priority to Blocker. > This Jira issue is FYI as another PR to upgrade {{commons-deamon}} to 1.2.0 > is coming right up. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (MINIFI-504) Upgrade commons-deamon to 1.2.0 to make build green again
Nenad Nikolic created MINIFI-504: Summary: Upgrade commons-deamon to 1.2.0 to make build green again Key: MINIFI-504 URL: https://issues.apache.org/jira/browse/MINIFI-504 Project: Apache NiFi MiNiFi Issue Type: Bug Components: Build Affects Versions: 0.6.0 Reporter: Nenad Nikolic Fix For: 0.6.0 Upon submitting the [PR #158|https://github.com/apache/nifi-minifi/pull/158] for a broken test (and underlying class) I realized that commons-daemon version 1.1.0 used at the moment is not available on the mirror anymore. Currently, there's only one, newer version 1.2.0 hosted at the moment. Because of this issue with the build system it's not possible to submit any correct PRs at the moment so I've set the priority to Blocker. This Jira issue is FYI as another PR to upgrade commons-deamon to 1.2.0 is coming right up. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[nifi] branch master updated: NIFI-6385 Added signal.id penalization - Add additional doc about best practices.
This is an automated email from the ASF dual-hosted git repository. markap14 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git The following commit(s) were added to refs/heads/master by this push: new 31097c9 NIFI-6385 Added signal.id penalization - Add additional doc about best practices. 31097c9 is described below commit 31097c96d6f77e1493c331f76838190d4df2b9e2 Author: Koji Kawamura AuthorDate: Thu Jun 20 10:38:22 2019 +0900 NIFI-6385 Added signal.id penalization - Add additional doc about best practices. This closes #3540. Signed-off-by: Mark Payne --- .../org/apache/nifi/processors/standard/Wait.java | 55 .../additionalDetails.html | 277 + .../apache/nifi/processors/standard/TestWait.java | 29 +++ 3 files changed, 361 insertions(+) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java index e297556..45ffcb2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -43,6 +44,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -212,6 +214,24 @@ public class Wait extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.NONE) .build(); +public static final PropertyDescriptor WAIT_PENALTY_DURATION = new PropertyDescriptor.Builder() +.name("wait-penalty-duration") +.displayName("Wait Penalty Duration") +.description("If configured, after a signal identifier got processed but did not meet the release criteria," + +" the signal identifier is penalized and FlowFiles having the signal identifier" + +" will not be processed again for the specified period of time," + +" so that the signal identifier will not block others to be processed." + +" This can be useful for use cases where a Wait processor is expected to process multiple signal identifiers," + +" and each signal identifier has multiple FlowFiles," + +" and also the order of releasing FlowFiles is important within a signal identifier." + +" The FlowFile order can be configured with Prioritizers." + +" IMPORTANT: There is a limitation of number of queued signals can be processed," + +" and Wait processor may not be able to check all queued signal ids. See additional details for the best practice.") +.required(false) +.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) +.expressionLanguageSupported(ExpressionLanguageScope.NONE) +.build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("A FlowFile with a matching release signal in the cache will be routed to this relationship") @@ -234,6 +254,8 @@ public class Wait extends AbstractProcessor { private final Set relationships; +private final Map signalIdPenalties = new HashMap<>(); + public Wait() { final Set rels = new HashSet<>(); rels.add(REL_SUCCESS); @@ -255,6 +277,7 @@ public class Wait extends AbstractProcessor { descriptors.add(DISTRIBUTED_CACHE_SERVICE); descriptors.add(ATTRIBUTE_COPY_MODE); descriptors.add(WAIT_MODE); +descriptors.add(WAIT_PENALTY_DURATION); return descriptors; } @@ -280,6 +303,19 @@ public class Wait extends AbstractProcessor { final List failedFilteringFlowFiles = new ArrayList<>(); final Supplier acceptResultSupplier = () -> bufferedCount.incrementAndGet() == bufferCount ? ACCEPT_AND_TERMINATE : ACCEPT_AND_CONTINUE; + +// Clear expired penalties. +if (!signalIdPenalties.isEmpty()) { +final Iterator> penaltyIterator = signalIdPenalties.entrySet().iterator(); +final long now = System.currentTimeMillis(); +
[nifi-minifi-cpp] branch master updated: MINIFICPP-733 - Redirect librdkafka log to proper Logger
This is an automated email from the ASF dual-hosted git repository. aboda pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git The following commit(s) were added to refs/heads/master by this push: new 7936e40 MINIFICPP-733 - Redirect librdkafka log to proper Logger 7936e40 is described below commit 7936e40e872c43b1780984da35a893405fce78a2 Author: Daniel Bakai AuthorDate: Mon Jul 8 13:22:26 2019 +0200 MINIFICPP-733 - Redirect librdkafka log to proper Logger Signed-off-by: Arpad Boda This closes #607 --- extensions/librdkafka/PublishKafka.cpp | 5 ++- extensions/librdkafka/PublishKafka.h | 71 -- 2 files changed, 71 insertions(+), 5 deletions(-) diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp index db4b84b..a1887a2 100644 --- a/extensions/librdkafka/PublishKafka.cpp +++ b/extensions/librdkafka/PublishKafka.cpp @@ -272,7 +272,7 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr } } -// Add all of the dynamic properties as librdkafka configurations + // Add all of the dynamic properties as librdkafka configurations const auto _prop_keys = context->getDynamicPropertyKeys(); logger_->log_info("PublishKafka registering %d librdkafka dynamic properties", dynamic_prop_keys.size()); @@ -286,6 +286,9 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr } } + // Set the logger callback + rd_kafka_conf_set_log_cb(conf_, KafkaConnection::logCallback); + auto producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, errstr, sizeof(errstr)); if (!producer) { diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h index 726632b..8a23dee 100644 --- a/extensions/librdkafka/PublishKafka.h +++ b/extensions/librdkafka/PublishKafka.h @@ -27,6 +27,7 @@ #include "core/Resource.h" #include "core/Property.h" #include "core/logging/LoggerConfiguration.h" +#include "core/logging/Logger.h" #include "rdkafka.h" #include @@ -68,7 +69,9 @@ class KafkaTopic { ~KafkaTopic() { if (topic_reference_) { rd_kafka_topic_destroy(topic_reference_); - topic_reference_ = 0; +} +if (topic_conf_) { + rd_kafka_topic_conf_destroy(topic_conf_); } } @@ -114,7 +117,8 @@ class KafkaConnection { public: explicit KafkaConnection(const KafkaConnectionKey ) - : conf_(nullptr), + : logger_(logging::LoggerFactory::getLogger()), +conf_(nullptr), kafka_connection_(nullptr) { lease_ = false; initialized_ = false; @@ -127,11 +131,23 @@ class KafkaConnection { void remove() { topics_.clear(); +removeConnection(); + } + + void removeConnection() { if (kafka_connection_) { rd_kafka_flush(kafka_connection_, 10 * 1000); /* wait for max 10 seconds */ rd_kafka_destroy(kafka_connection_); - kafka_connection_ = 0; + modifyLoggers([&](std::unordered_map>& loggers) { +loggers.erase(kafka_connection_); + }); + kafka_connection_ = nullptr; } +if (conf_) { + rd_kafka_conf_destroy(conf_); + conf_ = nullptr; +} +initialized_ = false; } bool initialized() { @@ -140,9 +156,13 @@ class KafkaConnection { void setConnection(rd_kafka_t *producer, rd_kafka_conf_t *conf) { std::lock_guard lock(mutex_); +removeConnection(); kafka_connection_ = producer; conf_ = conf; initialized_ = true; +modifyLoggers([&](std::unordered_map>& loggers) { + loggers[producer] = logger_; +}); } rd_kafka_conf_t *getConf() { @@ -180,8 +200,43 @@ class KafkaConnection { topics_.insert(std::make_pair(topicName, topic)); } + static void logCallback(const rd_kafka_t* rk, int level, const char* /*fac*/, const char* buf) { +std::shared_ptr logger; +try { + modifyLoggers([&](std::unordered_map>& loggers) { +logger = loggers.at(rk).lock(); + }); +} catch (...) { +} + +if (!logger) { + return; +} + +switch (level) { + case 0: // LOG_EMERG + case 1: // LOG_ALERT + case 2: // LOG_CRIT + case 3: // LOG_ERR +logging::LOG_ERROR(logger) << buf; +break; + case 4: // LOG_WARNING +logging::LOG_WARN(logger) << buf; +break; + case 5: // LOG_NOTICE + case 6: // LOG_INFO +logging::LOG_INFO(logger) << buf; +break; + case 7: // LOG_DEBUG +logging::LOG_DEBUG(logger) << buf; +break; +} + } + private: + std::shared_ptr logger_; + std::mutex mutex_; std::atomic lease_; @@ -190,10 +245,18 @@ class KafkaConnection { KafkaConnectionKey key_; - std::map > topics_; + std::map> topics_; rd_kafka_conf_t *conf_; rd_kafka_t *kafka_connection_; + + static void modifyLoggers(const std::function>&)>& func) { +static std::mutex loggers_mutex; +
[nifi-minifi-cpp] branch master updated: MINIFICPP-828 - Create a common regex utility
This is an automated email from the ASF dual-hosted git repository. aboda pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git The following commit(s) were added to refs/heads/master by this push: new 6c028c5 MINIFICPP-828 - Create a common regex utility 6c028c5 is described below commit 6c028c5441ac86222a717933c7076b4edd856b42 Author: Nghia Le AuthorDate: Mon Jul 8 17:03:19 2019 +0200 MINIFICPP-828 - Create a common regex utility Signed-off-by: Arpad Boda This closes #606 --- extensions/http-curl/client/HTTPClient.cpp | 21 +-- extensions/sftp/processors/ListSFTP.cpp| 57 +-- extensions/sftp/processors/ListSFTP.h | 15 +- .../standard-processors/processors/ExtractText.cpp | 102 +++-- .../standard-processors/processors/GetFile.cpp | 18 +-- .../standard-processors/processors/TailFile.cpp| 19 +-- libminifi/include/Exception.h | 3 +- libminifi/include/utils/RegexUtils.h | 84 +++ libminifi/src/utils/RegexUtils.cpp | 165 + libminifi/test/unit/RegexUtilsTests.cpp| 72 + 10 files changed, 371 insertions(+), 185 deletions(-) diff --git a/extensions/http-curl/client/HTTPClient.cpp b/extensions/http-curl/client/HTTPClient.cpp index f2be475..b42ed56 100644 --- a/extensions/http-curl/client/HTTPClient.cpp +++ b/extensions/http-curl/client/HTTPClient.cpp @@ -16,6 +16,7 @@ * limitations under the License. */ #include "HTTPClient.h" +#include "Exception.h" #include #include #include @@ -23,6 +24,7 @@ #include #include #include "utils/StringUtils.h" +#include "utils/RegexUtils.h" namespace org { namespace apache { @@ -335,21 +337,12 @@ void HTTPClient::set_request_method(const std::string method) { bool HTTPClient::matches(const std::string , const std::string ) { if (sregex == ".*") return true; - -#ifdef WIN32 - std::regex rgx(sregex); - return std::regex_match(value, rgx); -#else - regex_t regex; - int ret = regcomp(, sregex.c_str(), 0); - if (ret) -return false; - ret = regexec(, value.c_str(), (size_t) 0, NULL, 0); - regfree(); - if (ret) + try { +utils::Regex rgx(sregex); +return rgx.match(value); + } catch (const Exception ) { return false; -#endif - return true; + } } void HTTPClient::configure_secure_connection(CURL *http_session) { diff --git a/extensions/sftp/processors/ListSFTP.cpp b/extensions/sftp/processors/ListSFTP.cpp index e4b56be..c53a887 100644 --- a/extensions/sftp/processors/ListSFTP.cpp +++ b/extensions/sftp/processors/ListSFTP.cpp @@ -36,6 +36,7 @@ #include "utils/ByteArrayCallback.h" #include "utils/TimeUtil.h" #include "utils/StringUtils.h" +#include "utils/RegexUtils.h" #include "utils/ScopeGuard.h" #include "utils/file/FileUtils.h" #include "core/FlowFile.h" @@ -184,14 +185,6 @@ ListSFTP::ListSFTP(std::string name, utils::Identifier uuid /*= utils::Identifie } ListSFTP::~ListSFTP() { -#ifndef WIN32 - if (file_filter_regex_set_) { -regfree(_file_filter_regex_); - } - if (path_filter_regex_set_) { -regfree(_path_filter_regex_); - } -#endif } void ListSFTP::onSchedule(const std::shared_ptr , const std::shared_ptr ) { @@ -214,50 +207,24 @@ void ListSFTP::onSchedule(const std::shared_ptr , utils::StringUtils::StringToBool(value, follow_symlink_); } if (context->getProperty(FileFilterRegex.getName(), file_filter_regex_)) { -#ifndef WIN32 -if (file_filter_regex_set_) { - regfree(_file_filter_regex_); -} -int ret = regcomp(_file_filter_regex_, file_filter_regex_.c_str(), 0); -if (ret != 0) { - logger_->log_error("Failed to compile File Filter Regex \"%s\"", file_filter_regex_.c_str()); - file_filter_regex_set_ = false; -} else { - file_filter_regex_set_ = true; -} -#else try { - compiled_file_filter_regex_ = std::regex(file_filter_regex_); + compiled_file_filter_regex_ = utils::Regex(file_filter_regex_); file_filter_regex_set_ = true; -} catch (std::regex_error&) { +} catch (const Exception ) { logger_->log_error("Failed to compile File Filter Regex \"%s\"", file_filter_regex_.c_str()); file_filter_regex_set_ = false; } -#endif } else { file_filter_regex_set_ = false; } if (context->getProperty(PathFilterRegex.getName(), path_filter_regex_)) { -#ifndef WIN32 -if (path_filter_regex_set_) { - regfree(_path_filter_regex_); -} -int ret = regcomp(_path_filter_regex_, path_filter_regex_.c_str(), 0); -if (ret != 0) { - logger_->log_error("Failed to compile Path Filter Regex \"%s\"", path_filter_regex_.c_str()); - file_filter_regex_set_ = false; -} else { - path_filter_regex_set_ = true; -} -#else try { - compiled_path_filter_regex_ = std::regex(path_filter_regex_); +