[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #835: MINIFICPP-1281 - Improve test performance by using event polling instead of sleep by sync
hunyadi-dev commented on a change in pull request #835: URL: https://github.com/apache/nifi-minifi-cpp/pull/835#discussion_r41303 ## File path: extensions/http-curl/tests/HttpGetIntegrationTest.cpp ## @@ -74,6 +71,8 @@ class HttpResponder : public CivetHandler { }; int main(int argc, char **argv) { + using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime; Review comment: I don't think we have a test namespace. Some test utility are in `utils` and some are (eg. our whole test-plan) in global scope. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #835: MINIFICPP-1281 - Improve test performance by using event polling instead of sleep by sync
hunyadi-dev commented on a change in pull request #835: URL: https://github.com/apache/nifi-minifi-cpp/pull/835#discussion_r41303 ## File path: extensions/http-curl/tests/HttpGetIntegrationTest.cpp ## @@ -74,6 +71,8 @@ class HttpResponder : public CivetHandler { }; int main(int argc, char **argv) { + using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime; Review comment: I don't think we have a test namespace. Some test utility are in `utils` and some are in global scope. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #835: MINIFICPP-1281 - Improve test performance by using event polling instead of sleep by sync
hunyadi-dev commented on a change in pull request #835: URL: https://github.com/apache/nifi-minifi-cpp/pull/835#discussion_r455548728 ## File path: extensions/http-curl/tests/HTTPSiteToSiteTests.cpp ## @@ -81,7 +81,10 @@ class SiteToSiteTestHarness : public CoapIntegrationBase { void cleanup() override {} - void runAssertions() override {} + void runAssertions() override { +// There is nothing to verify here, but we are expected to wait for all paralell events to execute +std::this_thread::sleep_for(std::chrono::milliseconds(wait_time_)); Review comment: Because we don't want to bring that function back, because it would be abused again. It would also probably wait at an incorrect place with this PR-s design. Hope we can spare these declarations after rebasing on Adam D.'s PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #835: MINIFICPP-1281 - Improve test performance by using event polling instead of sleep by sync
hunyadi-dev commented on a change in pull request #835: URL: https://github.com/apache/nifi-minifi-cpp/pull/835#discussion_r455547307 ## File path: extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp ## @@ -126,34 +125,34 @@ int main(int argc, char **argv) { // now let's disable one of the controller services. std::shared_ptr cs_id = controller->getControllerServiceNode("ID"); assert(cs_id != nullptr); - { -std::lock_guard lock(control_mutex); -controller->disableControllerService(cs_id); -disabled = true; -waitToVerifyProcessor(); - } { std::lock_guard lock(control_mutex); controller->enableControllerService(cs_id); disabled = false; -waitToVerifyProcessor(); } std::shared_ptr mock_cont = controller->getControllerServiceNode("MockItLikeIts1995"); - assert(cs_id->enabled()); -{ + const bool test_success_01 = verifyEventHappenedInPollTime(std::chrono::seconds(4), [&cs_id] { +return cs_id->enabled(); + }); + { std::lock_guard lock(control_mutex); controller->disableReferencingServices(mock_cont); disabled = true; -waitToVerifyProcessor(); } -assert(cs_id->enabled() == false); -{ + const bool test_success_02 = verifyEventHappenedInPollTime(std::chrono::seconds(2), [&cs_id] { +return !cs_id->enabled(); + }); + { std::lock_guard lock(control_mutex); controller->enableReferencingServices(mock_cont); disabled = false; -waitToVerifyProcessor(); } - assert(cs_id->enabled() == true); + const bool test_success_03 = verifyEventHappenedInPollTime(std::chrono::seconds(2), [&cs_id] { Review comment: Will extract them out into a single lambda function. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #840: MINIFICPP-1293 Fix the failing PropertyTests
arpadboda commented on a change in pull request #840: URL: https://github.com/apache/nifi-minifi-cpp/pull/840#discussion_r455522730 ## File path: libminifi/test/unit/TimeUtilTests.cpp ## @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/TimeUtil.h" +#include "../TestBase.h" + +namespace { + constexpr int ONE_HOUR = 60 * 60; + constexpr int ONE_DAY = 24 * ONE_HOUR; + + struct tm createTm(int year, int month, int day, int hour, int minute, int second, bool is_dst = false) { +struct tm date_time; +date_time.tm_year = year - 1900; +date_time.tm_mon = month - 1; Review comment: Do I understand correctly that months are stored in [0-11] range, but days are stored in [1-31]? I know that this representation is not your idea, just looks a bit weird :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #840: MINIFICPP-1293 Fix the failing PropertyTests
arpadboda commented on a change in pull request #840: URL: https://github.com/apache/nifi-minifi-cpp/pull/840#discussion_r455522730 ## File path: libminifi/test/unit/TimeUtilTests.cpp ## @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/TimeUtil.h" +#include "../TestBase.h" + +namespace { + constexpr int ONE_HOUR = 60 * 60; + constexpr int ONE_DAY = 24 * ONE_HOUR; + + struct tm createTm(int year, int month, int day, int hour, int minute, int second, bool is_dst = false) { +struct tm date_time; +date_time.tm_year = year - 1900; +date_time.tm_mon = month - 1; Review comment: Do I understand correctly that months are stored in [0-11] range, but days are stored in [1-31]? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] arpadboda closed pull request #836: MINIFICPP-1248 Create unit tests for the ConsumeWindowsEventLog processor
arpadboda closed pull request #836: URL: https://github.com/apache/nifi-minifi-cpp/pull/836 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] esecules commented on pull request #4401: NIFI-7622 Use param context name from inside nested versioned PG when…
esecules commented on pull request #4401: URL: https://github.com/apache/nifi/pull/4401#issuecomment-659026103 This exception only gets thrown when you're pulling the flow from the registry. It would be better if it blocked versioning it to the registry in the first place. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (NIFI-7645) Implement AMQPRecord processors
[ https://issues.apache.org/jira/browse/NIFI-7645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter Turcsanyi updated NIFI-7645: -- Description: Reimplement PublishAMQP / ConsumeAMQP processors * support record oriented processing * overcome the issues with the current AMQP processors, like NIFI-6312 and NIFI-5896 > Implement AMQPRecord processors > --- > > Key: NIFI-7645 > URL: https://issues.apache.org/jira/browse/NIFI-7645 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Peter Turcsanyi >Assignee: Peter Turcsanyi >Priority: Major > > Reimplement PublishAMQP / ConsumeAMQP processors > * support record oriented processing > * overcome the issues with the current AMQP processors, like NIFI-6312 and > NIFI-5896 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (NIFI-7645) Implement AMQPRecord processors
[ https://issues.apache.org/jira/browse/NIFI-7645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter Turcsanyi updated NIFI-7645: -- Component/s: Extensions > Implement AMQPRecord processors > --- > > Key: NIFI-7645 > URL: https://issues.apache.org/jira/browse/NIFI-7645 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Peter Turcsanyi >Assignee: Peter Turcsanyi >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (NIFI-6312) AMQP processors seem to have thread cleanup issues
[ https://issues.apache.org/jira/browse/NIFI-6312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17158707#comment-17158707 ] Peter Turcsanyi commented on NIFI-6312: --- Opened a PR ([https://github.com/apache/nifi/pull/4411]) with fixes for the most critical connection handling / threading issues. Also created a jira for reimplementing AMQP processors with record-oriented processing support (NIFI-7645). > AMQP processors seem to have thread cleanup issues > -- > > Key: NIFI-6312 > URL: https://issues.apache.org/jira/browse/NIFI-6312 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.9.0 >Reporter: Robert Bruno >Assignee: Peter Turcsanyi >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > At a minimum the ConsumeAMQP processor exhibits this behavior but the > PublishAMQP may as well. > If ConsumeAMQP is listening to a working AMQP server and then that server > name is no longer resolvable errors begin to show up in logs saying the > hostname can't be resolve. This is expected. > What isn't expected is if you then turn off the processor or even delete the > processor the error message persists. The only way to resolve this is > restarting the nifi node. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (NIFI-6312) AMQP processors seem to have thread cleanup issues
[ https://issues.apache.org/jira/browse/NIFI-6312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter Turcsanyi updated NIFI-6312: -- Status: Patch Available (was: Open) > AMQP processors seem to have thread cleanup issues > -- > > Key: NIFI-6312 > URL: https://issues.apache.org/jira/browse/NIFI-6312 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.9.0 >Reporter: Robert Bruno >Assignee: Peter Turcsanyi >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > At a minimum the ConsumeAMQP processor exhibits this behavior but the > PublishAMQP may as well. > If ConsumeAMQP is listening to a working AMQP server and then that server > name is no longer resolvable errors begin to show up in logs saying the > hostname can't be resolve. This is expected. > What isn't expected is if you then turn off the processor or even delete the > processor the error message persists. The only way to resolve this is > restarting the nifi node. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] turcsanyip opened a new pull request #4411: NIFI-6312: Improved connection handling in AMQP processors
turcsanyip opened a new pull request #4411: URL: https://github.com/apache/nifi/pull/4411 Disable connection automatic recovery which can lead to uncontrolled/stale threads. Handle the recovery in the processors instead. Use poisoning in case of errors, then discarding and recreating the poisoned consumer/publisher. https://issues.apache.org/jira/browse/NIFI-6312 Thank you for submitting a contribution to Apache NiFi. Please provide a short description of the PR here: Description of PR _Enables X functionality; fixes bug NIFI-._ In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with **NIFI-** where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically `main`)? - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._ ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] Have you verified that the full build is successful on JDK 8? - [ ] Have you verified that the full build is successful on JDK 11? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`? - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`? - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (NIFI-6312) AMQP processors seem to have thread cleanup issues
[ https://issues.apache.org/jira/browse/NIFI-6312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter Turcsanyi reassigned NIFI-6312: - Assignee: Peter Turcsanyi > AMQP processors seem to have thread cleanup issues > -- > > Key: NIFI-6312 > URL: https://issues.apache.org/jira/browse/NIFI-6312 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.9.0 >Reporter: Robert Bruno >Assignee: Peter Turcsanyi >Priority: Major > > At a minimum the ConsumeAMQP processor exhibits this behavior but the > PublishAMQP may as well. > If ConsumeAMQP is listening to a working AMQP server and then that server > name is no longer resolvable errors begin to show up in logs saying the > hostname can't be resolve. This is expected. > What isn't expected is if you then turn off the processor or even delete the > processor the error message persists. The only way to resolve this is > restarting the nifi node. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (NIFI-7645) Implement AMQPRecord processors
Peter Turcsanyi created NIFI-7645: - Summary: Implement AMQPRecord processors Key: NIFI-7645 URL: https://issues.apache.org/jira/browse/NIFI-7645 Project: Apache NiFi Issue Type: Improvement Reporter: Peter Turcsanyi Assignee: Peter Turcsanyi -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (NIFI-7628) Parameter context in multi tenant environment
[ https://issues.apache.org/jira/browse/NIFI-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17158700#comment-17158700 ] Bryan Bende commented on NIFI-7628: --- You can also have read/write policies on parameter contexts, in order to bind one to a process group you need read permissions on the context and write permissions on the process group. > Parameter context in multi tenant environment > - > > Key: NIFI-7628 > URL: https://issues.apache.org/jira/browse/NIFI-7628 > Project: Apache NiFi > Issue Type: Improvement > Components: Flow Versioning, SDLC >Reporter: naveen kumar saharan >Priority: Major > > In multi-tenant environment, its not possible to restrict the parameters for > different BU/vertical/tenant. > Currently either you can give access(view or modify) to a PG or not. > > But in multi-tenant environment we want the BU/vertical tenant specific > access on Param context so that people don't disturb each others work and its > important wrt to security and compliance also -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (NIFI-7644) New Relic reporting task
Ihar Hulevich created NIFI-7644: --- Summary: New Relic reporting task Key: NIFI-7644 URL: https://issues.apache.org/jira/browse/NIFI-7644 Project: Apache NiFi Issue Type: New Feature Reporter: Ihar Hulevich Integration between NiFi and NewRelic for sending metrics, it's expecting the separate New Relic bundle. The reporting task should two options for integration with New Relic: Agent and API. for example how it's implemented in micrometer: [https://github.com/micrometer-metrics/micrometer/blob/master/implementations/micrometer-registry-new-relic/src/main/java/io/micrometer/newrelic/NewRelicInsightsAgentClientProvider.java] [https://github.com/micrometer-metrics/micrometer/blob/master/implementations/micrometer-registry-new-relic/src/main/java/io/micrometer/newrelic/NewRelicInsightsApiClientProvider.java] The number of metrics should be not less than integration with Datadog [https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricNames.java#L24-L73] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (NIFI-7643) Unpackcontent writing absolute path property does not make sense
[ https://issues.apache.org/jira/browse/NIFI-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tamás Bunth updated NIFI-7643: -- Description: Steps to reproduce: # Create a tar or zip file with some arbitrary content in its root. The bug occurs only when using these two formats. # Create a flow: GetFile -> UnpackContent -> LogAttribute. # Set GetFile to fetch the compressed test file from the file system. Set UnpackContent to use the appropriate format. Unpackcontent writes an attribute "file.absolutePath" which is currently the relative path of the unpacked content concatenated with the current working directory (where Nifi has been run). E.g. if I unpacked a file with relative path "egg/ham.txt" and I run Nifi from "/usr/bin", "file.absolutePath" would be "/usr/bin/egg/ham.txt". In my opinion, "absolute path" does not make much sense in this context. I suggest removing it. was: Steps to reproduce: * Create a tar or zip file with some arbitrary content in its root. The bug occurs only when using these two formats. * Create a flow: GetFile -> UnpackContent -> LogAttribute. * Set GetFile to fetch the compressed test file from the file system. Set UnpackContent to use the appropriate format. Unpackcontent writes an attribute "file.absolutePath" which is currently the relative path of the unpacked content concatenated with the current working directory (where Nifi has been run). E.g. if I unpacked a file with relative path "egg/ham.txt" and I run Nifi from "/usr/bin", "file.absolutePath" would be "/usr/bin/egg/ham.txt". In my opinion, "absolute path" does not make much sense in this context. I suggest removing it. > Unpackcontent writing absolute path property does not make sense > > > Key: NIFI-7643 > URL: https://issues.apache.org/jira/browse/NIFI-7643 > Project: Apache NiFi > Issue Type: Bug >Reporter: Tamás Bunth >Priority: Major > > Steps to reproduce: > # Create a tar or zip file with some arbitrary content in its root. The bug > occurs only when using these two formats. > # Create a flow: GetFile -> UnpackContent -> LogAttribute. > # Set GetFile to fetch the compressed test file from the file system. Set > UnpackContent to use the appropriate format. > Unpackcontent writes an attribute "file.absolutePath" which is currently the > relative path of the unpacked content concatenated with the current working > directory (where Nifi has been run). > E.g. if I unpacked a file with relative path "egg/ham.txt" and I run Nifi > from "/usr/bin", "file.absolutePath" would be "/usr/bin/egg/ham.txt". > In my opinion, "absolute path" does not make much sense in this context. I > suggest removing it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (NIFI-7643) Unpackcontent writing absolute path property does not make sense
Tamás Bunth created NIFI-7643: - Summary: Unpackcontent writing absolute path property does not make sense Key: NIFI-7643 URL: https://issues.apache.org/jira/browse/NIFI-7643 Project: Apache NiFi Issue Type: Bug Reporter: Tamás Bunth Steps to reproduce: * Create a tar or zip file with some arbitrary content in its root. The bug occurs only when using these two formats. * Create a flow: GetFile -> UnpackContent -> LogAttribute. * Set GetFile to fetch the compressed test file from the file system. Set UnpackContent to use the appropriate format. Unpackcontent writes an attribute "file.absolutePath" which is currently the relative path of the unpacked content concatenated with the current working directory (where Nifi has been run). E.g. if I unpacked a file with relative path "egg/ham.txt" and I run Nifi from "/usr/bin", "file.absolutePath" would be "/usr/bin/egg/ham.txt". In my opinion, "absolute path" does not make much sense in this context. I suggest removing it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi-minifi-cpp] arpadboda commented on pull request #836: MINIFICPP-1248 Create unit tests for the ConsumeWindowsEventLog processor
arpadboda commented on pull request #836: URL: https://github.com/apache/nifi-minifi-cpp/pull/836#issuecomment-658855469 @fgerlits : The stringutils part seems to fail on gcc: https://travis-ci.org/github/apache/nifi-minifi-cpp/jobs/708347373 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] arpadboda commented on pull request #821: MINIFICPP-1251 - Implement and test RetryFlowFile processor
arpadboda commented on pull request #821: URL: https://github.com/apache/nifi-minifi-cpp/pull/821#issuecomment-658816200 @hunyadi-dev : could you rebase and push? The changes look good to me, but I would like to see it build on Win without errors. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] goosalex opened a new pull request #4410: NIFI-7642: Improve batching into FlowFiles under high latency conditions
goosalex opened a new pull request #4410: URL: https://github.com/apache/nifi/pull/4410 Thank you for submitting a contribution to Apache NiFi. Please provide a short description of the PR here: Description of PR Improves receiving large batches of Kafka messages per FlowFile when network latency is high. fixes bug NIFI-7642 Wires existing processor properties to: * Remove 1000 records "magic number" * Remove 10ms poll timeout "magic number" In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with **NIFI-** where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically `main`)? - [x] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._ ### For code changes: - [x] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder? - [x] Have you written or updated unit tests to verify your changes? - [x] Have you verified that the full build is successful on JDK 8? - [x] Have you verified that the full build is successful on JDK 11? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`? - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`? - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] fgerlits opened a new pull request #840: MINIFICPP-1293 Fix the failing PropertyTests
fgerlits opened a new pull request #840: URL: https://github.com/apache/nifi-minifi-cpp/pull/840 See https://issues.apache.org/jira/browse/MINIFICPP-1293 for the description of the bug. Description of the change: instead of using `mktime` and adjusting by the time zone offset, use `_mkgmtime` on Windows where it is available, and use a hand-written version of `mkgmtime` elsewhere. --- Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFICPP- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] goosalex closed pull request #4409: Nifi 7642 - Wire KafkaConsumer properties instead of magic numbers to improve batching into fewer FlowFiles under high latency conditions
goosalex closed pull request #4409: URL: https://github.com/apache/nifi/pull/4409 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #833: MINIFICPP-1272 - Graceful shutdown on flow update
arpadboda commented on a change in pull request #833: URL: https://github.com/apache/nifi-minifi-cpp/pull/833#discussion_r455083609 ## File path: libminifi/src/core/ProcessGroup.cpp ## @@ -412,6 +414,18 @@ void ProcessGroup::drainConnections() { } } +std::size_t ProcessGroup::getTotalFlowFileCount() const { + std::size_t sum = 0; + for (auto& conn : connections_) { Review comment: Fair point, not sure which option is better, so feel free to choose. You can also mark this thread resolved. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #833: MINIFICPP-1272 - Graceful shutdown on flow update
arpadboda commented on a change in pull request #833: URL: https://github.com/apache/nifi-minifi-cpp/pull/833#discussion_r455083106 ## File path: libminifi/test/flow-tests/FlowControllerTests.cpp ## @@ -119,7 +107,125 @@ TEST_CASE("Flow shutdown drains connections", "[TestFlow1]") { controller->stop(true); + REQUIRE(sinkProc->trigger_count == 0); + for (auto& it : connectionMap) { REQUIRE(it.second->isEmpty()); } } + +TEST_CASE("Flow shutdown waits for a while", "[TestFlow2]") { + TestControllerWithFlow testController(yamlConfig); + auto controller = testController.controller_; + auto root = testController.root_; + + auto sourceProc = std::static_pointer_cast(root->findProcessor("Generator")); + auto sinkProc = std::static_pointer_cast(root->findProcessor("TestProcessor")); + + // prevent the initial trigger + // in case the source got triggered + // and the scheduler triggers the sink + // before we could initiate the shutdown + sinkProc->yield(100); + + testController.startFlow(); + + // wait for the source processor to enqueue its flowFiles + std::this_thread::sleep_for(std::chrono::milliseconds{50}); + + REQUIRE(sourceProc->trigger_count.load() == 1); + REQUIRE(sinkProc->trigger_count.load() == 0); + + controller->stop(true); + + REQUIRE(sourceProc->trigger_count.load() == 1); + REQUIRE(sinkProc->trigger_count.load() == 3); +} + +TEST_CASE("Flow stopped after grace period", "[TestFlow3]") { + TestControllerWithFlow testController(yamlConfig); + auto controller = testController.controller_; + auto root = testController.root_; + + testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, "1000 ms"); + + auto sourceProc = std::static_pointer_cast(root->findProcessor("Generator")); + auto sinkProc = std::static_pointer_cast(root->findProcessor("TestProcessor")); + + // prevent the initial trigger + // in case the source got triggered + // and the scheduler triggers the sink + sinkProc->yield(100); + + sinkProc->onTriggerCb_ = [&]{ +static std::atomic first_onTrigger{true}; +bool isFirst = true; +// sleep only on the first trigger +if (first_onTrigger.compare_exchange_strong(isFirst, false)) { + std::this_thread::sleep_for(std::chrono::milliseconds{1500}); +} + }; + + testController.startFlow(); + + // wait for the source processor to enqueue its flowFiles + std::this_thread::sleep_for(std::chrono::milliseconds{50}); Review comment: Sounds good to me, thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] goosalex opened a new pull request #4409: Nifi 7642 - Wire KafkaConsumer properties instead of magic numbers to improve batching into fewer FlowFiles under high latency conditions
goosalex opened a new pull request #4409: URL: https://github.com/apache/nifi/pull/4409 Thank you for submitting a contribution to Apache NiFi. Please provide a short description of the PR here: Description of PR Improves receiving large batches of Kafka messages per FlowFile when network latency is high. fixes bug NIFI-7642 In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with **NIFI-** where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically `main`)? - [x] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._ ### For code changes: - [x] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder? - [x] Have you written or updated unit tests to verify your changes? - [x] Have you verified that the full build is successful on JDK 8? - [x] Have you verified that the full build is successful on JDK 11? - [-] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [-] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`? - [-] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`? - [-] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [-] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (NIFI-7628) Parameter context in multi tenant environment
[ https://issues.apache.org/jira/browse/NIFI-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pierre Villard updated NIFI-7628: - Component/s: SDLC Flow Versioning > Parameter context in multi tenant environment > - > > Key: NIFI-7628 > URL: https://issues.apache.org/jira/browse/NIFI-7628 > Project: Apache NiFi > Issue Type: Improvement > Components: Flow Versioning, SDLC >Reporter: naveen kumar saharan >Priority: Major > > In multi-tenant environment, its not possible to restrict the parameters for > different BU/vertical/tenant. > Currently either you can give access(view or modify) to a PG or not. > > But in multi-tenant environment we want the BU/vertical tenant specific > access on Param context so that people don't disturb each others work and its > important wrt to security and compliance also -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (NIFI-7628) Parameter context in multi tenant environment
[ https://issues.apache.org/jira/browse/NIFI-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17158178#comment-17158178 ] Pierre Villard commented on NIFI-7628: -- Not sure to understand the request here. A parameter context is attached to a process group and you can have a process group per BU / tenant. What is the issue? What would be the expected behavior? > Parameter context in multi tenant environment > - > > Key: NIFI-7628 > URL: https://issues.apache.org/jira/browse/NIFI-7628 > Project: Apache NiFi > Issue Type: Improvement > Components: Flow Versioning, SDLC >Reporter: naveen kumar saharan >Priority: Major > > In multi-tenant environment, its not possible to restrict the parameters for > different BU/vertical/tenant. > Currently either you can give access(view or modify) to a PG or not. > > But in multi-tenant environment we want the BU/vertical tenant specific > access on Param context so that people don't disturb each others work and its > important wrt to security and compliance also -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (NIFI-7628) Parameter context in multi tenant environment
[ https://issues.apache.org/jira/browse/NIFI-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pierre Villard updated NIFI-7628: - Issue Type: Improvement (was: Bug) > Parameter context in multi tenant environment > - > > Key: NIFI-7628 > URL: https://issues.apache.org/jira/browse/NIFI-7628 > Project: Apache NiFi > Issue Type: Improvement >Reporter: naveen kumar saharan >Priority: Major > > In multi-tenant environment, its not possible to restrict the parameters for > different BU/vertical/tenant. > Currently either you can give access(view or modify) to a PG or not. > > But in multi-tenant environment we want the BU/vertical tenant specific > access on Param context so that people don't disturb each others work and its > important wrt to security and compliance also -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi-minifi-cpp] arpadboda closed pull request #838: MINIFICPP-1183 Fix Windows compile errors
arpadboda closed pull request #838: URL: https://github.com/apache/nifi-minifi-cpp/pull/838 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (NIFIREG-344) ClientUtils reads "Content-Disposition" before checking status code
[ https://issues.apache.org/jira/browse/NIFIREG-344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bryan Bende updated NIFIREG-344: Fix Version/s: (was: 0.7.0) > ClientUtils reads "Content-Disposition" before checking status code > --- > > Key: NIFIREG-344 > URL: https://issues.apache.org/jira/browse/NIFIREG-344 > Project: NiFi Registry > Issue Type: Bug >Affects Versions: 0.5.0 >Reporter: Bryan Bende >Assignee: Bryan Bende >Priority: Minor > > The issue is that if the response something other than 200, the code still > tries to get the Content-Disposition header which won't be there, and then > throws IllegalStateException, but really there should have been a specific > exception for things like a 404. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (NIFIREG-292) Add database implementations of UserGroupProvider and AccessPolicyProvider
[ https://issues.apache.org/jira/browse/NIFIREG-292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bryan Bende updated NIFIREG-292: Fix Version/s: (was: 0.7.0) > Add database implementations of UserGroupProvider and AccessPolicyProvider > -- > > Key: NIFIREG-292 > URL: https://issues.apache.org/jira/browse/NIFIREG-292 > Project: NiFi Registry > Issue Type: Improvement >Reporter: Bryan Bende >Assignee: Bryan Bende >Priority: Major > Time Spent: 2h 50m > Remaining Estimate: 0h > > We should offer database backed implementations of UserGroupProvider and > AccessPolicyProvider as an alternative to the file-based impls. We have LDAP > and Ranger alternatives, but for people not using those, the DB impls would > be a good way to get the data off the local filesystem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (NIFIREG-321) Integrate nifi-registry-revsion into REST API and service layers
[ https://issues.apache.org/jira/browse/NIFIREG-321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bryan Bende resolved NIFIREG-321. - Resolution: Fixed > Integrate nifi-registry-revsion into REST API and service layers > > > Key: NIFIREG-321 > URL: https://issues.apache.org/jira/browse/NIFIREG-321 > Project: NiFi Registry > Issue Type: Improvement >Reporter: Bryan Bende >Assignee: Bryan Bende >Priority: Major > Fix For: 0.7.0 > > Time Spent: 3h > Remaining Estimate: 0h > > In NIFIREG-300 we setup the revision modules which represent a library for > implementing optimistic locking using the same approach NiFi currently > implements. > This ticket is the follow on work to now integrate the revision concept into > NiFi Registry's REST API and service layer. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (NIFIREG-404) Perform release manager activities for 0.7.0 release
Bryan Bende created NIFIREG-404: --- Summary: Perform release manager activities for 0.7.0 release Key: NIFIREG-404 URL: https://issues.apache.org/jira/browse/NIFIREG-404 Project: NiFi Registry Issue Type: Task Reporter: Bryan Bende Assignee: Bryan Bende Fix For: 0.7.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (NIFIREG-381) Skip integration tests in Github CI/Actions
[ https://issues.apache.org/jira/browse/NIFIREG-381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bryan Bende resolved NIFIREG-381. - Resolution: Fixed > Skip integration tests in Github CI/Actions > --- > > Key: NIFIREG-381 > URL: https://issues.apache.org/jira/browse/NIFIREG-381 > Project: NiFi Registry > Issue Type: Task >Reporter: Joe Witt >Assignee: Joe Witt >Priority: Major > Fix For: 0.7.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (NIFI-7642) KafkaConsumers: Improve batching into fewer FlowFiles under high latency conditions
[ https://issues.apache.org/jira/browse/NIFI-7642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alex Goos updated NIFI-7642: Summary: KafkaConsumers: Improve batching into fewer FlowFiles under high latency conditions (was: KafkaConsumers: Improve batching into fewer FlowFiles under high latency conditios) > KafkaConsumers: Improve batching into fewer FlowFiles under high latency > conditions > --- > > Key: NIFI-7642 > URL: https://issues.apache.org/jira/browse/NIFI-7642 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.3.0, 1.4.0, 1.5.0, 1.6.0, 1.7.0, 1.8.0, 1.7.1, 1.9.0, > 1.10.0, 1.9.1, 1.9.2, 1.11.0, 1.11.1, 1.11.2, 1.11.4 >Reporter: Alex Goos >Priority: Minor > Labels: kafka > > NIFI-3962 introduced two hard-coded magic numbers into KafkaConsumers. : > * maxRecords is capped at 1000, regardless of the property setting > * the poll timeout is fixed at 10ms > Under high throughput & high latency conditions, this leads to too small > small files. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.
adamdebreceni commented on a change in pull request #807: URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r455004287 ## File path: libminifi/src/core/ProcessSession.cpp ## @@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source, std::vector buffer(getpagesize()); try { -try { - std::ifstream input{source, std::ios::in | std::ios::binary}; - logger_->log_debug("Opening %s", source); - if (!input.is_open() || !input.good()) { -throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'")); +std::ifstream input{source, std::ios::in | std::ios::binary}; +logger_->log_debug("Opening %s", source); +if (!input.is_open() || !input.good()) { + throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'")); +} +if (offset != 0U) { + input.seekg(offset, std::ifstream::beg); + if (!input.good()) { +logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source); +throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset))); } - if (offset != 0U) { -input.seekg(offset, std::ifstream::beg); -if (!input.good()) { - logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source); - throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset))); -} +} +uint64_t startTime = 0U; +while (input.good()) { + input.read(reinterpret_cast(buffer.data()), buffer.size()); + std::streamsize read = input.gcount(); + if (read < 0) { +throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value"); } - uint64_t startTime = 0U; - while (input.good()) { -input.read(reinterpret_cast(buffer.data()), buffer.size()); -std::streamsize read = input.gcount(); -if (read < 0) { - throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value"); -} -if (read == 0) { - logger_->log_trace("Finished reading input %s", source); + if (read == 0) { +logger_->log_trace("Finished reading input %s", source); +break; + } else { +logging::LOG_TRACE(logger_) << "Read input of " << read; + } + uint8_t* begin = buffer.data(); + uint8_t* end = begin + read; + while (true) { +startTime = getTimeMillis(); +uint8_t* delimiterPos = std::find(begin, end, static_cast(inputDelimiter)); +const auto len = gsl::narrow(delimiterPos - begin); + +logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end); +/* + * We do not want to process the rest of the buffer after the last delimiter if + * - we have reached EOF in the file (we would discard it anyway) + * - there is nothing to process (the last character in the buffer is a delimiter) + */ +if (delimiterPos == end && (input.eof() || len == 0)) { break; -} else { - logging::LOG_TRACE(logger_) << "Read input of " << read; } -uint8_t* begin = buffer.data(); -uint8_t* end = begin + read; -while (true) { - startTime = getTimeMillis(); - uint8_t* delimiterPos = std::find(begin, end, static_cast(inputDelimiter)); - const auto len = gsl::narrow(delimiterPos - begin); - - logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end); - /* - * We do not want to process the rest of the buffer after the last delimiter if - * - we have reached EOF in the file (we would discard it anyway) - * - there is nothing to process (the last character in the buffer is a delimiter) - */ - if (delimiterPos == end && (input.eof() || len == 0)) { -break; - } - - /* Create claim and stream if needed and append data */ - if (claim == nullptr) { -startTime = getTimeMillis(); -claim = std::make_shared(process_context_->getContentRepository()); - } - if (stream == nullptr) { -stream = process_context_->getContentRepository()->write(claim); - } - if (stream == nullptr) { -logger_->log_error("Stream is null"); -rollback(); -return; - } - if (stream->write(begin, len) != len) { -logger_->log_error("Error while writ
[jira] [Created] (MINIFICPP-1294) Move stream ref counting to the ResourceClaim class
Adam Debreceni created MINIFICPP-1294: - Summary: Move stream ref counting to the ResourceClaim class Key: MINIFICPP-1294 URL: https://issues.apache.org/jira/browse/MINIFICPP-1294 Project: Apache NiFi MiNiFi C++ Issue Type: Improvement Reporter: Adam Debreceni Fix For: 1.0.0 Currently the FlowFile and others manually increment/decrement the refCount of each ResourceClaim. As each ResourceClaim also has a reference to the content repository (in form a StreamManager) it should be able to notify the manager if a ResourceClaim instance is created/destroyed. The only place requiring manual refCount adjustments should be when we Serialize/Deserialize the FlowFiles. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.
adamdebreceni commented on a change in pull request #807: URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r455004287 ## File path: libminifi/src/core/ProcessSession.cpp ## @@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source, std::vector buffer(getpagesize()); try { -try { - std::ifstream input{source, std::ios::in | std::ios::binary}; - logger_->log_debug("Opening %s", source); - if (!input.is_open() || !input.good()) { -throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'")); +std::ifstream input{source, std::ios::in | std::ios::binary}; +logger_->log_debug("Opening %s", source); +if (!input.is_open() || !input.good()) { + throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'")); +} +if (offset != 0U) { + input.seekg(offset, std::ifstream::beg); + if (!input.good()) { +logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source); +throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset))); } - if (offset != 0U) { -input.seekg(offset, std::ifstream::beg); -if (!input.good()) { - logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source); - throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset))); -} +} +uint64_t startTime = 0U; +while (input.good()) { + input.read(reinterpret_cast(buffer.data()), buffer.size()); + std::streamsize read = input.gcount(); + if (read < 0) { +throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value"); } - uint64_t startTime = 0U; - while (input.good()) { -input.read(reinterpret_cast(buffer.data()), buffer.size()); -std::streamsize read = input.gcount(); -if (read < 0) { - throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value"); -} -if (read == 0) { - logger_->log_trace("Finished reading input %s", source); + if (read == 0) { +logger_->log_trace("Finished reading input %s", source); +break; + } else { +logging::LOG_TRACE(logger_) << "Read input of " << read; + } + uint8_t* begin = buffer.data(); + uint8_t* end = begin + read; + while (true) { +startTime = getTimeMillis(); +uint8_t* delimiterPos = std::find(begin, end, static_cast(inputDelimiter)); +const auto len = gsl::narrow(delimiterPos - begin); + +logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end); +/* + * We do not want to process the rest of the buffer after the last delimiter if + * - we have reached EOF in the file (we would discard it anyway) + * - there is nothing to process (the last character in the buffer is a delimiter) + */ +if (delimiterPos == end && (input.eof() || len == 0)) { break; -} else { - logging::LOG_TRACE(logger_) << "Read input of " << read; } -uint8_t* begin = buffer.data(); -uint8_t* end = begin + read; -while (true) { - startTime = getTimeMillis(); - uint8_t* delimiterPos = std::find(begin, end, static_cast(inputDelimiter)); - const auto len = gsl::narrow(delimiterPos - begin); - - logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end); - /* - * We do not want to process the rest of the buffer after the last delimiter if - * - we have reached EOF in the file (we would discard it anyway) - * - there is nothing to process (the last character in the buffer is a delimiter) - */ - if (delimiterPos == end && (input.eof() || len == 0)) { -break; - } - - /* Create claim and stream if needed and append data */ - if (claim == nullptr) { -startTime = getTimeMillis(); -claim = std::make_shared(process_context_->getContentRepository()); - } - if (stream == nullptr) { -stream = process_context_->getContentRepository()->write(claim); - } - if (stream == nullptr) { -logger_->log_error("Stream is null"); -rollback(); -return; - } - if (stream->write(begin, len) != len) { -logger_->log_error("Error while writ
[jira] [Updated] (NIFI-7642) KafkaConsumers: Improve batching into fewer FlowFiles under high latency conditios
[ https://issues.apache.org/jira/browse/NIFI-7642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alex Goos updated NIFI-7642: Labels: kafka (was: ) > KafkaConsumers: Improve batching into fewer FlowFiles under high latency > conditios > -- > > Key: NIFI-7642 > URL: https://issues.apache.org/jira/browse/NIFI-7642 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.3.0, 1.4.0, 1.5.0, 1.6.0, 1.7.0, 1.8.0, 1.7.1, 1.9.0, > 1.10.0, 1.9.1, 1.9.2, 1.11.0, 1.11.1, 1.11.2, 1.11.4 >Reporter: Alex Goos >Priority: Minor > Labels: kafka > > NIFI-3962 introduced two hard-coded magic numbers into KafkaConsumers. : > * maxRecords is capped at 1000, regardless of the property setting > * the poll timeout is fixed at 10ms > Under high throughput & high latency conditions, this leads to too small > small files. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (NIFI-7642) KafkaConsumers: Improve batching into fewer FlowFiles under high latency conditios
Alex Goos created NIFI-7642: --- Summary: KafkaConsumers: Improve batching into fewer FlowFiles under high latency conditios Key: NIFI-7642 URL: https://issues.apache.org/jira/browse/NIFI-7642 Project: Apache NiFi Issue Type: Bug Components: Extensions Affects Versions: 1.11.4, 1.11.2, 1.11.1, 1.11.0, 1.9.2, 1.9.1, 1.10.0, 1.9.0, 1.7.1, 1.8.0, 1.7.0, 1.6.0, 1.5.0, 1.4.0, 1.3.0 Reporter: Alex Goos NIFI-3962 introduced two hard-coded magic numbers into KafkaConsumers. : * maxRecords is capped at 1000, regardless of the property setting * the poll timeout is fixed at 10ms Under high throughput & high latency conditions, this leads to too small small files. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (NIFI-7641) NullPointerException with ExecuteFlumeSource
Pierre Villard created NIFI-7641: Summary: NullPointerException with ExecuteFlumeSource Key: NIFI-7641 URL: https://issues.apache.org/jira/browse/NIFI-7641 Project: Apache NiFi Issue Type: Bug Components: Extensions Reporter: Pierre Villard Faced a NPE when using the ExecuteFlumeSource processor. Seems to be when the processor couldn't bind to the port as another processor was already using it. {code:java} 2020-07-14 11:40:54,373 INFO org.apache.flume.source.DefaultSourceFactory: Creating instance of source src-1, type avro 2020-07-14 11:40:54,374 INFO org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent: Scheduled ExecuteFlumeSource[id=b8103a4f-871d-12f4-8530-997105f8c85d] to run with 1 threads 2020-07-14 11:40:54,374 INFO org.apache.flume.source.AvroSource: Avro source src-1 stopping: Avro source src-1: { bindAddress: 0.0.0.0, port: 50100 } 2020-07-14 11:40:54,375 ERROR org.apache.nifi.processors.flume.ExecuteFlumeSource: ExecuteFlumeSource[id=b8103a4f-871d-12f4-8530-997105f8c85d] ExecuteFlumeSource[id=b8103a4f-871d-12f4-8530-997105f8c85d] failed to process session due to java.lang.NullPointerException; Processor Administratively Yielded for 1 sec: java.lang.NullPointerException java.lang.NullPointerException: null at org.apache.flume.source.AvroSource.stop(AvroSource.java:302) at org.apache.flume.source.EventDrivenSourceRunner.stop(EventDrivenSourceRunner.java:51) at org.apache.nifi.processors.flume.ExecuteFlumeSource.stopped(ExecuteFlumeSource.java:158) at org.apache.nifi.processors.flume.ExecuteFlumeSource.onTrigger(ExecuteFlumeSource.java:179) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2020-07-14 11:40:54,375 WARN org.apache.nifi.controller.tasks.ConnectableTask: Administratively Yielding ExecuteFlumeSource[id=b8103a4f-871d-12f4-8530-997105f8c85d] due to uncaught Exception: java.lang.NullPointerException java.lang.NullPointerException: null at org.apache.flume.source.AvroSource.stop(AvroSource.java:302) at org.apache.flume.source.EventDrivenSourceRunner.stop(EventDrivenSourceRunner.java:51) at org.apache.nifi.processors.flume.ExecuteFlumeSource.stopped(ExecuteFlumeSource.java:158) at org.apache.nifi.processors.flume.ExecuteFlumeSource.onTrigger(ExecuteFlumeSource.java:179) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2020-07-14 11:40:54,898 INFO org.apache.nifi.controller.StandardFlowService: Saved flow controller org.apache.nifi.controller.FlowController@3a534280 // Another save pending = false 2020-07-14 11:40:55,991 INFO org.apache.nifi.controller.scheduling.StandardProcessScheduler: Starting ExecuteFlumeSource[id=21e134ef-a08b-1570-89a5-064680f551a0] 2020-07-14 11:40:55,991 INFO org.apache.nifi.controller.StandardProcessorNode: Starting ExecuteFlumeSource[id=21e134ef-a08b-1570-89a5-064680f551a0] {code} -- This message w
[jira] [Created] (NIFI-7640) HandleHttpRequest - define temporary files location
Pierre Villard created NIFI-7640: Summary: HandleHttpRequest - define temporary files location Key: NIFI-7640 URL: https://issues.apache.org/jira/browse/NIFI-7640 Project: Apache NiFi Issue Type: Improvement Components: Extensions Reporter: Pierre Villard A property should be added to the processor allowing users to define where temporary files are located. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] KuKuDeCheng commented on pull request #4326: NIFI-7519:Add a ExecuteSQL property on DBCPConnectionPool
KuKuDeCheng commented on pull request #4326: URL: https://github.com/apache/nifi/pull/4326#issuecomment-658667516 > I'm not change about github checks error. > I run this check successful on my compute. You need update DatabaseRecordSinkTest.java. Add your ExecuteSQL property like this: ` when(dbContext.getProperty(DATABASE_URL)).thenReturn(new MockPropertyValue("jdbc:derby:${DB_LOCATION}")) ` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (NIFI-7635) Fix StandardConfigurationContext to read the default property value from its ComponentNode instance
[ https://issues.apache.org/jira/browse/NIFI-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter Gyori updated NIFI-7635: -- Resolution: Fixed Status: Resolved (was: Patch Available) > Fix StandardConfigurationContext to read the default property value from its > ComponentNode instance > --- > > Key: NIFI-7635 > URL: https://issues.apache.org/jira/browse/NIFI-7635 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Reporter: Peter Gyori >Assignee: Peter Gyori >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > If an effective property value is not set, > StandardConfigurationContext.getProperty(PropertyDescriptor property) reads > the default property value from its parameter (property) instead of the > ComponentNode instance. This results in faulty behavior when e.g. a > controller service is enabled with its default settings and one or more of > these default values differ from the default values of the supplied > PropertyDescriptor. > A fix needs to be implemented to get the property descriptor and its default > value from the component itself and not the supplied PropertyDescriptor > parameter. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] pgyori commented on pull request #4408: NIFI-7635: StandardConfigurationContext.getProperty() gets the proper…
pgyori commented on pull request #4408: URL: https://github.com/apache/nifi/pull/4408#issuecomment-658628455 Thank you, @markap14 ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.
adamdebreceni commented on a change in pull request #807: URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454874069 ## File path: libminifi/src/core/ProcessSession.cpp ## @@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source, std::vector buffer(getpagesize()); try { -try { - std::ifstream input{source, std::ios::in | std::ios::binary}; - logger_->log_debug("Opening %s", source); - if (!input.is_open() || !input.good()) { -throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'")); +std::ifstream input{source, std::ios::in | std::ios::binary}; +logger_->log_debug("Opening %s", source); +if (!input.is_open() || !input.good()) { + throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'")); +} +if (offset != 0U) { + input.seekg(offset, std::ifstream::beg); + if (!input.good()) { +logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source); +throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset))); } - if (offset != 0U) { -input.seekg(offset, std::ifstream::beg); -if (!input.good()) { - logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source); - throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset))); -} +} +uint64_t startTime = 0U; +while (input.good()) { + input.read(reinterpret_cast(buffer.data()), buffer.size()); + std::streamsize read = input.gcount(); + if (read < 0) { +throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value"); } - uint64_t startTime = 0U; - while (input.good()) { -input.read(reinterpret_cast(buffer.data()), buffer.size()); -std::streamsize read = input.gcount(); -if (read < 0) { - throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value"); -} -if (read == 0) { - logger_->log_trace("Finished reading input %s", source); + if (read == 0) { +logger_->log_trace("Finished reading input %s", source); +break; + } else { +logging::LOG_TRACE(logger_) << "Read input of " << read; + } + uint8_t* begin = buffer.data(); + uint8_t* end = begin + read; + while (true) { +startTime = getTimeMillis(); +uint8_t* delimiterPos = std::find(begin, end, static_cast(inputDelimiter)); +const auto len = gsl::narrow(delimiterPos - begin); + +logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end); +/* + * We do not want to process the rest of the buffer after the last delimiter if + * - we have reached EOF in the file (we would discard it anyway) + * - there is nothing to process (the last character in the buffer is a delimiter) + */ +if (delimiterPos == end && (input.eof() || len == 0)) { break; -} else { - logging::LOG_TRACE(logger_) << "Read input of " << read; } -uint8_t* begin = buffer.data(); -uint8_t* end = begin + read; -while (true) { - startTime = getTimeMillis(); - uint8_t* delimiterPos = std::find(begin, end, static_cast(inputDelimiter)); - const auto len = gsl::narrow(delimiterPos - begin); - - logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end); - /* - * We do not want to process the rest of the buffer after the last delimiter if - * - we have reached EOF in the file (we would discard it anyway) - * - there is nothing to process (the last character in the buffer is a delimiter) - */ - if (delimiterPos == end && (input.eof() || len == 0)) { -break; - } - - /* Create claim and stream if needed and append data */ - if (claim == nullptr) { -startTime = getTimeMillis(); -claim = std::make_shared(process_context_->getContentRepository()); - } - if (stream == nullptr) { -stream = process_context_->getContentRepository()->write(claim); - } - if (stream == nullptr) { -logger_->log_error("Stream is null"); -rollback(); -return; - } - if (stream->write(begin, len) != len) { -logger_->log_error("Error while writ
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.
adamdebreceni commented on a change in pull request #807: URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454874069 ## File path: libminifi/src/core/ProcessSession.cpp ## @@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source, std::vector buffer(getpagesize()); try { -try { - std::ifstream input{source, std::ios::in | std::ios::binary}; - logger_->log_debug("Opening %s", source); - if (!input.is_open() || !input.good()) { -throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'")); +std::ifstream input{source, std::ios::in | std::ios::binary}; +logger_->log_debug("Opening %s", source); +if (!input.is_open() || !input.good()) { + throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'")); +} +if (offset != 0U) { + input.seekg(offset, std::ifstream::beg); + if (!input.good()) { +logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source); +throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset))); } - if (offset != 0U) { -input.seekg(offset, std::ifstream::beg); -if (!input.good()) { - logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source); - throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset))); -} +} +uint64_t startTime = 0U; +while (input.good()) { + input.read(reinterpret_cast(buffer.data()), buffer.size()); + std::streamsize read = input.gcount(); + if (read < 0) { +throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value"); } - uint64_t startTime = 0U; - while (input.good()) { -input.read(reinterpret_cast(buffer.data()), buffer.size()); -std::streamsize read = input.gcount(); -if (read < 0) { - throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value"); -} -if (read == 0) { - logger_->log_trace("Finished reading input %s", source); + if (read == 0) { +logger_->log_trace("Finished reading input %s", source); +break; + } else { +logging::LOG_TRACE(logger_) << "Read input of " << read; + } + uint8_t* begin = buffer.data(); + uint8_t* end = begin + read; + while (true) { +startTime = getTimeMillis(); +uint8_t* delimiterPos = std::find(begin, end, static_cast(inputDelimiter)); +const auto len = gsl::narrow(delimiterPos - begin); + +logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end); +/* + * We do not want to process the rest of the buffer after the last delimiter if + * - we have reached EOF in the file (we would discard it anyway) + * - there is nothing to process (the last character in the buffer is a delimiter) + */ +if (delimiterPos == end && (input.eof() || len == 0)) { break; -} else { - logging::LOG_TRACE(logger_) << "Read input of " << read; } -uint8_t* begin = buffer.data(); -uint8_t* end = begin + read; -while (true) { - startTime = getTimeMillis(); - uint8_t* delimiterPos = std::find(begin, end, static_cast(inputDelimiter)); - const auto len = gsl::narrow(delimiterPos - begin); - - logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end); - /* - * We do not want to process the rest of the buffer after the last delimiter if - * - we have reached EOF in the file (we would discard it anyway) - * - there is nothing to process (the last character in the buffer is a delimiter) - */ - if (delimiterPos == end && (input.eof() || len == 0)) { -break; - } - - /* Create claim and stream if needed and append data */ - if (claim == nullptr) { -startTime = getTimeMillis(); -claim = std::make_shared(process_context_->getContentRepository()); - } - if (stream == nullptr) { -stream = process_context_->getContentRepository()->write(claim); - } - if (stream == nullptr) { -logger_->log_error("Stream is null"); -rollback(); -return; - } - if (stream->write(begin, len) != len) { -logger_->log_error("Error while writ
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.
adamdebreceni commented on a change in pull request #807: URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454871893 ## File path: extensions/rocksdb-repos/FlowFileRepository.cpp ## @@ -148,22 +148,27 @@ void FlowFileRepository::prune_stored_flowfiles() { std::string key = it->key().ToString(); if (eventRead->DeSerialize(reinterpret_cast(it->value().data()), it->value().size())) { logger_->log_debug("Found connection for %s, path %s ", eventRead->getConnectionUuid(), eventRead->getContentFullPath()); - auto search = connectionMap.find(eventRead->getConnectionUuid()); - if (!corrupt_checkpoint && search != connectionMap.end()) { + bool found = false; + auto search = containers.find(eventRead->getConnectionUuid()); + found = (search != containers.end()); + if (!found) { +// for backward compatibility +search = connectionMap.find(eventRead->getConnectionUuid()); +found = (search != connectionMap.end()); + } Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (NIFI-6061) PutDatabaseRecord does not properly handle BLOB/CLOB fields
[ https://issues.apache.org/jira/browse/NIFI-6061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17157981#comment-17157981 ] ZhangCheng commented on NIFI-6061: -- For Oracle TIMESTAMP field, when PutDatabaseRecord tries to insert it via setObject(), it will throws exception ORA-01843:not a valid month. I think this exception will be solved afte fixing this bug. > PutDatabaseRecord does not properly handle BLOB/CLOB fields > --- > > Key: NIFI-6061 > URL: https://issues.apache.org/jira/browse/NIFI-6061 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Reporter: Matt Burgess >Priority: Major > > BLOB/CLOB fields in NiFi's Record API are returned from the record as > Object[Byte], but when PutDatabaseRecord tries to insert Object[] via > setObject(), the following error occurs: > 2019-02-20 15:11:16,216 WARN [Timer-Driven Process Thread-10] > o.a.n.p.standard.PutDatabaseRecord > PutDatabaseRecord[id=0c84b9de-0169-1000-0164-3fbad7a17664] Failed to process > StandardFlowFileRecord[uuid=d739f432-0871-41bb-a0c9-d6ceeac68a6d,claim=StandardContentClaim > [resourceClaim=StandardResourceClaim[id=155069058-1, container=default, > section=1], offset=1728, > length=251],offset=0,name=d739f432-0871-41bb-a0c9-d6ceeac68a6d,size=251] due > to org.postgresql.util.PSQLException: Can't infer the SQL type to use for an > instance of [Ljava.lang.Object;. Use setObject() with an explicit Types value > to specify the type to use.: > Somewhere in the value conversion/representation, PutDatabaseRecord would > likely need to create a java.sql.Blob object and transfer the bytes into it. > One issue I see is that the record field type has been converted to > Array[Byte], so the information that the field is a BLOB is lost by that > point. If this requires DB-specific code, we'd likely need to add a Database > Adapter property and delegate out to the various DB adapters. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.
adamdebreceni commented on a change in pull request #807: URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454824460 ## File path: libminifi/include/core/Repository.h ## @@ -228,6 +232,8 @@ class Repository : public virtual core::SerializableComponent, public core::Trac Repository &operator=(const Repository &parent) = delete; protected: + std::map> containers; Review comment: currently `Connectable` declares the `put` method, meaning that `Connectable`s are exactly what can be containers, later I would like if we had a `FlowFileContainer` class and separate this capability out This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.
adamdebreceni commented on a change in pull request #807: URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454842291 ## File path: libminifi/include/core/FlowFile.h ## @@ -35,9 +35,56 @@ namespace minifi { namespace core { class FlowFile : public core::Connectable, public ReferenceContainer { + private: + class FlowFileOwnedResourceClaimPtr{ + public: +FlowFileOwnedResourceClaimPtr() = default; +explicit FlowFileOwnedResourceClaimPtr(const std::shared_ptr& claim) : claim_(claim) { + if (claim_) claim_->increaseFlowFileRecordOwnedCount(); +} +explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr&& claim) : claim_(std::move(claim)) { + if (claim_) claim_->increaseFlowFileRecordOwnedCount(); +} +FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : claim_(ref.claim_) { + if (claim_) claim_->increaseFlowFileRecordOwnedCount(); +} +FlowFileOwnedResourceClaimPtr(FlowFileOwnedResourceClaimPtr&& ref) : claim_(std::move(ref.claim_)) { + // taking ownership of claim, no need to increment/decrement +} +FlowFileOwnedResourceClaimPtr& operator=(const FlowFileOwnedResourceClaimPtr& ref) = delete; +FlowFileOwnedResourceClaimPtr& operator=(FlowFileOwnedResourceClaimPtr&& ref) = delete; + +FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const FlowFileOwnedResourceClaimPtr& ref) { + return set(owner, ref.claim_); +} +FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const std::shared_ptr& newClaim) { + auto oldClaim = claim_; + claim_ = newClaim; + // the order of increase/release is important + if (claim_) claim_->increaseFlowFileRecordOwnedCount(); + if (oldClaim) owner.releaseClaim(oldClaim); Review comment: with refcount manipulation we always increment first, then decrement as this way we don't accidentally discard the object under ourselves, note that an equality check will not suffice as two `ResourceClaim` instances can reference the same file (they may have the same contentPath) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.
adamdebreceni commented on a change in pull request #807: URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454839615 ## File path: libminifi/test/persistence-tests/PersistenceTests.cpp ## @@ -0,0 +1,218 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "core/Core.h" +#include "core/repository/AtomicRepoEntries.h" +#include "core/RepositoryFactory.h" +#include "FlowFileRecord.h" +#include "FlowFileRepository.h" +#include "properties/Configure.h" +#include "../unit/ProvenanceTestHelper.h" +#include "../TestBase.h" +#include "../../extensions/libarchive/MergeContent.h" +#include "../test/BufferReader.h" + +using Connection = minifi::Connection; +using MergeContent = minifi::processors::MergeContent; + +struct TestFlow{ + TestFlow(const std::shared_ptr& ff_repository, const std::shared_ptr& content_repo, const std::shared_ptr& prov_repo) + : ff_repository(ff_repository), content_repo(content_repo), prov_repo(prov_repo) { +std::shared_ptr controller_services_provider = nullptr; + +// setup MERGE processor +{ + merge = std::make_shared("MergeContent", mergeProcUUID()); + merge->initialize(); + merge->setAutoTerminatedRelationships({{"original", "d"}}); + + merge->setProperty(MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE); + merge->setProperty(MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK); + merge->setProperty(MergeContent::DelimiterStrategy, DELIMITER_STRATEGY_TEXT); + merge->setProperty(MergeContent::MinEntries, "3"); + merge->setProperty(MergeContent::Header, "_Header_"); + merge->setProperty(MergeContent::Footer, "_Footer_"); + merge->setProperty(MergeContent::Demarcator, "_Demarcator_"); + merge->setProperty(MergeContent::MaxBinAge, "1 h"); + + std::shared_ptr node = std::make_shared(merge); + mergeContext = std::make_shared(node, controller_services_provider, prov_repo, ff_repository, content_repo); +} + +// setup INPUT processor +{ + inputProcessor = std::make_shared("source", inputProcUUID()); + std::shared_ptr node = std::make_shared(inputProcessor); + inputContext = std::make_shared(node, controller_services_provider, prov_repo, +ff_repository, content_repo); +} + +// setup Input Connection +{ + input = std::make_shared(ff_repository, content_repo, "Input", inputConnUUID()); + input->setRelationship({"input", "d"}); + input->setDestinationUUID(mergeProcUUID()); + input->setSourceUUID(inputProcUUID()); + inputProcessor->addConnection(input); +} + +// setup Output Connection +{ + output = std::make_shared(ff_repository, content_repo, "Output", outputConnUUID()); + output->setRelationship(MergeContent::Merge); + output->setSourceUUID(mergeProcUUID()); +} + +// setup ProcessGroup +{ + root = std::make_shared(core::ProcessGroupType::ROOT_PROCESS_GROUP, "root"); + root->addProcessor(merge); + root->addConnection(input); + root->addConnection(output); +} + +// prepare Merge Processor for execution +merge->setScheduledState(core::ScheduledState::RUNNING); +merge->onSchedule(mergeContext.get(), new core::ProcessSessionFactory(mergeContext)); + } + void write(const std::string& data) { +minifi::io::DataStream stream(reinterpret_cast(data.c_str()), data.length()); +core::ProcessSession sessionGenFlowFile(inputContext); +std::shared_ptr flow = std::static_pointer_cast(sessionGenFlowFile.create()); +sessionGenFlowFile.importFrom(stream, flow); +sessionGenFlowFile.transfer(flow, {"input", "d"}); +sessionGenFlowFile.commit(); Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.
adamdebreceni commented on a change in pull request #807: URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454836324 ## File path: libminifi/src/core/ProcessSession.cpp ## @@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source, std::vector buffer(getpagesize()); try { -try { - std::ifstream input{source, std::ios::in | std::ios::binary}; - logger_->log_debug("Opening %s", source); - if (!input.is_open() || !input.good()) { -throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'")); +std::ifstream input{source, std::ios::in | std::ios::binary}; +logger_->log_debug("Opening %s", source); +if (!input.is_open() || !input.good()) { + throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: failed to open file \'", source, "\'")); +} +if (offset != 0U) { + input.seekg(offset, std::ifstream::beg); + if (!input.good()) { +logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source); +throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset))); } - if (offset != 0U) { -input.seekg(offset, std::ifstream::beg); -if (!input.good()) { - logger_->log_error("Seeking to %lu failed for file %s (does file/filesystem support seeking?)", offset, source); - throw Exception(FILE_OPERATION_EXCEPTION, utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", std::to_string(offset))); -} +} +uint64_t startTime = 0U; +while (input.good()) { + input.read(reinterpret_cast(buffer.data()), buffer.size()); + std::streamsize read = input.gcount(); + if (read < 0) { +throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value"); } - uint64_t startTime = 0U; - while (input.good()) { -input.read(reinterpret_cast(buffer.data()), buffer.size()); -std::streamsize read = input.gcount(); -if (read < 0) { - throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount returned negative value"); -} -if (read == 0) { - logger_->log_trace("Finished reading input %s", source); + if (read == 0) { +logger_->log_trace("Finished reading input %s", source); +break; + } else { +logging::LOG_TRACE(logger_) << "Read input of " << read; + } + uint8_t* begin = buffer.data(); + uint8_t* end = begin + read; + while (true) { +startTime = getTimeMillis(); +uint8_t* delimiterPos = std::find(begin, end, static_cast(inputDelimiter)); +const auto len = gsl::narrow(delimiterPos - begin); + +logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end); +/* + * We do not want to process the rest of the buffer after the last delimiter if + * - we have reached EOF in the file (we would discard it anyway) + * - there is nothing to process (the last character in the buffer is a delimiter) + */ +if (delimiterPos == end && (input.eof() || len == 0)) { break; -} else { - logging::LOG_TRACE(logger_) << "Read input of " << read; } -uint8_t* begin = buffer.data(); -uint8_t* end = begin + read; -while (true) { - startTime = getTimeMillis(); - uint8_t* delimiterPos = std::find(begin, end, static_cast(inputDelimiter)); - const auto len = gsl::narrow(delimiterPos - begin); - - logging::LOG_TRACE(logger_) << "Read input of " << read << " length is " << len << " is at end?" << (delimiterPos == end); - /* - * We do not want to process the rest of the buffer after the last delimiter if - * - we have reached EOF in the file (we would discard it anyway) - * - there is nothing to process (the last character in the buffer is a delimiter) - */ - if (delimiterPos == end && (input.eof() || len == 0)) { -break; - } - - /* Create claim and stream if needed and append data */ - if (claim == nullptr) { -startTime = getTimeMillis(); -claim = std::make_shared(process_context_->getContentRepository()); - } - if (stream == nullptr) { -stream = process_context_->getContentRepository()->write(claim); - } - if (stream == nullptr) { -logger_->log_error("Stream is null"); -rollback(); -return; - } - if (stream->write(begin, len) != len) { -logger_->log_error("Error while writ
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.
adamdebreceni commented on a change in pull request #807: URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454834083 ## File path: libminifi/src/FlowFileRecord.cpp ## @@ -118,28 +110,27 @@ FlowFileRecord::~FlowFileRecord() { logger_->log_debug("Delete FlowFile UUID %s", uuidStr_); else logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuidStr_); - if (claim_) { -releaseClaim(claim_); - } else { + + if (!claim_) { logger_->log_debug("Claim is null ptr for %s", uuidStr_); } + claim_.set(*this, nullptr); + // Disown stash claims - for (const auto &stashPair : stashedContent_) { -releaseClaim(stashPair.second); + for (auto &stashPair : stashedContent_) { +auto& stashClaim = stashPair.second; +stashClaim.set(*this, nullptr); } } void FlowFileRecord::releaseClaim(std::shared_ptr claim) { // Decrease the flow file record owned count for the resource claim - claim_->decreaseFlowFileRecordOwnedCount(); - std::string value; - logger_->log_debug("Delete Resource Claim %s, %s, attempt %llu", getUUIDStr(), claim_->getContentFullPath(), claim_->getFlowFileRecordOwnedCount()); - if (claim_->getFlowFileRecordOwnedCount() <= 0) { -// we cannot rely on the stored variable here since we aren't guaranteed atomicity -if (flow_repository_ != nullptr && !flow_repository_->Get(uuidStr_, value)) { - logger_->log_debug("Delete Resource Claim %s", claim_->getContentFullPath()); - content_repo_->remove(claim_); + claim->decreaseFlowFileRecordOwnedCount(); + logger_->log_debug("Detaching Resource Claim %s, %s, attempt %llu", getUUIDStr(), claim->getContentFullPath(), claim->getFlowFileRecordOwnedCount()); Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.
adamdebreceni commented on a change in pull request #807: URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454833537 ## File path: libminifi/src/core/ProcessSession.cpp ## @@ -248,35 +237,27 @@ void ProcessSession::penalize(const std::shared_ptr &flow) { void ProcessSession::transfer(const std::shared_ptr &flow, Relationship relationship) { logging::LOG_INFO(logger_) << "Transferring " << flow->getUUIDStr() << " from " << process_context_->getProcessorNode()->getName() << " to relationship " << relationship.getName(); _transferRelationship[flow->getUUIDStr()] = relationship; + flow->setDeleted(false); Review comment: during `commit` and `rollback` we check if the items in `_deletedFlowFiles` indeed stayed deleted, or a transfer or add marked them for "resurrection" ## File path: libminifi/src/core/ProcessSession.cpp ## @@ -248,35 +237,27 @@ void ProcessSession::penalize(const std::shared_ptr &flow) { void ProcessSession::transfer(const std::shared_ptr &flow, Relationship relationship) { logging::LOG_INFO(logger_) << "Transferring " << flow->getUUIDStr() << " from " << process_context_->getProcessorNode()->getName() << " to relationship " << relationship.getName(); _transferRelationship[flow->getUUIDStr()] = relationship; + flow->setDeleted(false); Review comment: during `commit` and `rollback` we check if the items in `_deletedFlowFiles` stayed deleted, or a transfer or add marked them for "resurrection" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org