[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #835: MINIFICPP-1281 - Improve test performance by using event polling instead of sleep by sync

2020-07-15 Thread GitBox


hunyadi-dev commented on a change in pull request #835:
URL: https://github.com/apache/nifi-minifi-cpp/pull/835#discussion_r41303



##
File path: extensions/http-curl/tests/HttpGetIntegrationTest.cpp
##
@@ -74,6 +71,8 @@ class HttpResponder : public CivetHandler {
 };
 
 int main(int argc, char **argv) {
+  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;

Review comment:
   I don't think we have a test namespace. Some test utility are in `utils` 
and some are (eg. our whole test-plan) in global scope.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #835: MINIFICPP-1281 - Improve test performance by using event polling instead of sleep by sync

2020-07-15 Thread GitBox


hunyadi-dev commented on a change in pull request #835:
URL: https://github.com/apache/nifi-minifi-cpp/pull/835#discussion_r41303



##
File path: extensions/http-curl/tests/HttpGetIntegrationTest.cpp
##
@@ -74,6 +71,8 @@ class HttpResponder : public CivetHandler {
 };
 
 int main(int argc, char **argv) {
+  using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;

Review comment:
   I don't think we have a test namespace. Some test utility are in `utils` 
and some are in global scope.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #835: MINIFICPP-1281 - Improve test performance by using event polling instead of sleep by sync

2020-07-15 Thread GitBox


hunyadi-dev commented on a change in pull request #835:
URL: https://github.com/apache/nifi-minifi-cpp/pull/835#discussion_r455548728



##
File path: extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
##
@@ -81,7 +81,10 @@ class SiteToSiteTestHarness : public CoapIntegrationBase {
 
   void cleanup() override {}
 
-  void runAssertions() override {}
+  void runAssertions() override {
+// There is nothing to verify here, but we are expected to wait for all 
paralell events to execute 
+std::this_thread::sleep_for(std::chrono::milliseconds(wait_time_));

Review comment:
   Because we don't want to bring that function back, because it would be 
abused again. It would also probably wait at an incorrect place with this PR-s 
design. Hope we can spare these declarations after rebasing on Adam D.'s PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] hunyadi-dev commented on a change in pull request #835: MINIFICPP-1281 - Improve test performance by using event polling instead of sleep by sync

2020-07-15 Thread GitBox


hunyadi-dev commented on a change in pull request #835:
URL: https://github.com/apache/nifi-minifi-cpp/pull/835#discussion_r455547307



##
File path: extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
##
@@ -126,34 +125,34 @@ int main(int argc, char **argv) {
   // now let's disable one of the controller services.
   std::shared_ptr cs_id = 
controller->getControllerServiceNode("ID");
   assert(cs_id != nullptr);
-  {
-std::lock_guard lock(control_mutex);
-controller->disableControllerService(cs_id);
-disabled = true;
-waitToVerifyProcessor();
-  }
   {
 std::lock_guard lock(control_mutex);
 controller->enableControllerService(cs_id);
 disabled = false;
-waitToVerifyProcessor();
   }
   std::shared_ptr mock_cont = 
controller->getControllerServiceNode("MockItLikeIts1995");
-  assert(cs_id->enabled());
-{
+  const bool test_success_01 = 
verifyEventHappenedInPollTime(std::chrono::seconds(4), [&cs_id] {
+return cs_id->enabled();
+  });
+  {
 std::lock_guard lock(control_mutex);
 controller->disableReferencingServices(mock_cont);
 disabled = true;
-waitToVerifyProcessor();
   }
-assert(cs_id->enabled() == false);
-{
+  const bool test_success_02 = 
verifyEventHappenedInPollTime(std::chrono::seconds(2), [&cs_id] {
+return !cs_id->enabled();
+  });
+  {
 std::lock_guard lock(control_mutex);
 controller->enableReferencingServices(mock_cont);
 disabled = false;
-waitToVerifyProcessor();
   }
-  assert(cs_id->enabled() == true);
+  const bool test_success_03 = 
verifyEventHappenedInPollTime(std::chrono::seconds(2), [&cs_id] {

Review comment:
   Will extract them out into a single lambda function.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #840: MINIFICPP-1293 Fix the failing PropertyTests

2020-07-15 Thread GitBox


arpadboda commented on a change in pull request #840:
URL: https://github.com/apache/nifi-minifi-cpp/pull/840#discussion_r455522730



##
File path: libminifi/test/unit/TimeUtilTests.cpp
##
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/TimeUtil.h"
+#include "../TestBase.h"
+
+namespace {
+  constexpr int ONE_HOUR = 60 * 60;
+  constexpr int ONE_DAY = 24 * ONE_HOUR;
+
+  struct tm createTm(int year, int month, int day, int hour, int minute, int 
second, bool is_dst = false) {
+struct tm date_time;
+date_time.tm_year = year - 1900;
+date_time.tm_mon = month - 1;

Review comment:
   Do I understand correctly that months are stored in [0-11] range, but 
days are stored in [1-31]?
   I know that this representation is not your idea, just looks a bit weird :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #840: MINIFICPP-1293 Fix the failing PropertyTests

2020-07-15 Thread GitBox


arpadboda commented on a change in pull request #840:
URL: https://github.com/apache/nifi-minifi-cpp/pull/840#discussion_r455522730



##
File path: libminifi/test/unit/TimeUtilTests.cpp
##
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/TimeUtil.h"
+#include "../TestBase.h"
+
+namespace {
+  constexpr int ONE_HOUR = 60 * 60;
+  constexpr int ONE_DAY = 24 * ONE_HOUR;
+
+  struct tm createTm(int year, int month, int day, int hour, int minute, int 
second, bool is_dst = false) {
+struct tm date_time;
+date_time.tm_year = year - 1900;
+date_time.tm_mon = month - 1;

Review comment:
   Do I understand correctly that months are stored in [0-11] range, but 
days are stored in [1-31]?
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] arpadboda closed pull request #836: MINIFICPP-1248 Create unit tests for the ConsumeWindowsEventLog processor

2020-07-15 Thread GitBox


arpadboda closed pull request #836:
URL: https://github.com/apache/nifi-minifi-cpp/pull/836


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] esecules commented on pull request #4401: NIFI-7622 Use param context name from inside nested versioned PG when…

2020-07-15 Thread GitBox


esecules commented on pull request #4401:
URL: https://github.com/apache/nifi/pull/4401#issuecomment-659026103


   This exception only gets thrown when you're pulling the flow from the 
registry. It would be better if it blocked versioning it to the registry in the 
first place.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (NIFI-7645) Implement AMQPRecord processors

2020-07-15 Thread Peter Turcsanyi (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-7645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Turcsanyi updated NIFI-7645:
--
Description: 
Reimplement PublishAMQP / ConsumeAMQP processors
 * support record oriented processing
 * overcome the issues with the current AMQP processors, like NIFI-6312 and 
NIFI-5896

> Implement AMQPRecord processors
> ---
>
> Key: NIFI-7645
> URL: https://issues.apache.org/jira/browse/NIFI-7645
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Peter Turcsanyi
>Assignee: Peter Turcsanyi
>Priority: Major
>
> Reimplement PublishAMQP / ConsumeAMQP processors
>  * support record oriented processing
>  * overcome the issues with the current AMQP processors, like NIFI-6312 and 
> NIFI-5896



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (NIFI-7645) Implement AMQPRecord processors

2020-07-15 Thread Peter Turcsanyi (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-7645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Turcsanyi updated NIFI-7645:
--
Component/s: Extensions

> Implement AMQPRecord processors
> ---
>
> Key: NIFI-7645
> URL: https://issues.apache.org/jira/browse/NIFI-7645
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Extensions
>Reporter: Peter Turcsanyi
>Assignee: Peter Turcsanyi
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (NIFI-6312) AMQP processors seem to have thread cleanup issues

2020-07-15 Thread Peter Turcsanyi (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-6312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17158707#comment-17158707
 ] 

Peter Turcsanyi commented on NIFI-6312:
---

Opened a PR ([https://github.com/apache/nifi/pull/4411]) with fixes for the 
most critical connection handling / threading issues.

Also created a jira for reimplementing AMQP processors with record-oriented 
processing support (NIFI-7645).

> AMQP processors seem to have thread cleanup issues
> --
>
> Key: NIFI-6312
> URL: https://issues.apache.org/jira/browse/NIFI-6312
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.9.0
>Reporter: Robert Bruno
>Assignee: Peter Turcsanyi
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> At a minimum the ConsumeAMQP processor exhibits this behavior but the 
> PublishAMQP may as well.
> If ConsumeAMQP is listening to a working AMQP server and then that server 
> name is no longer resolvable errors begin to show up in logs saying the 
> hostname can't be resolve.  This is expected.
> What isn't expected is if you then turn off the processor or even delete the 
> processor the error message persists.  The only way to resolve this is 
> restarting the nifi node.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (NIFI-6312) AMQP processors seem to have thread cleanup issues

2020-07-15 Thread Peter Turcsanyi (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-6312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Turcsanyi updated NIFI-6312:
--
Status: Patch Available  (was: Open)

> AMQP processors seem to have thread cleanup issues
> --
>
> Key: NIFI-6312
> URL: https://issues.apache.org/jira/browse/NIFI-6312
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.9.0
>Reporter: Robert Bruno
>Assignee: Peter Turcsanyi
>Priority: Major
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> At a minimum the ConsumeAMQP processor exhibits this behavior but the 
> PublishAMQP may as well.
> If ConsumeAMQP is listening to a working AMQP server and then that server 
> name is no longer resolvable errors begin to show up in logs saying the 
> hostname can't be resolve.  This is expected.
> What isn't expected is if you then turn off the processor or even delete the 
> processor the error message persists.  The only way to resolve this is 
> restarting the nifi node.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] turcsanyip opened a new pull request #4411: NIFI-6312: Improved connection handling in AMQP processors

2020-07-15 Thread GitBox


turcsanyip opened a new pull request #4411:
URL: https://github.com/apache/nifi/pull/4411


   Disable connection automatic recovery which can lead to uncontrolled/stale 
threads. Handle the recovery in the processors instead.
   Use poisoning in case of errors, then discarding and recreating the poisoned 
consumer/publisher.
   
   https://issues.apache.org/jira/browse/NIFI-6312
   
   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
    Description of PR
   
   _Enables X functionality; fixes bug NIFI-._
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
in the commit message?
   
   - [ ] Does your PR title start with **NIFI-** where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.
   
   - [ ] Has your PR been rebased against the latest commit within the target 
branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional 
commits in response to PR reviewer feedback should be made on this branch and 
pushed to allow change tracking. Do not `squash` or use `--force` when pushing 
to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn 
-Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main 
`LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main 
`NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to 
.name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which 
it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for 
build issues and submit an update to your PR as soon as possible.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (NIFI-6312) AMQP processors seem to have thread cleanup issues

2020-07-15 Thread Peter Turcsanyi (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-6312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Turcsanyi reassigned NIFI-6312:
-

Assignee: Peter Turcsanyi

> AMQP processors seem to have thread cleanup issues
> --
>
> Key: NIFI-6312
> URL: https://issues.apache.org/jira/browse/NIFI-6312
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.9.0
>Reporter: Robert Bruno
>Assignee: Peter Turcsanyi
>Priority: Major
>
> At a minimum the ConsumeAMQP processor exhibits this behavior but the 
> PublishAMQP may as well.
> If ConsumeAMQP is listening to a working AMQP server and then that server 
> name is no longer resolvable errors begin to show up in logs saying the 
> hostname can't be resolve.  This is expected.
> What isn't expected is if you then turn off the processor or even delete the 
> processor the error message persists.  The only way to resolve this is 
> restarting the nifi node.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (NIFI-7645) Implement AMQPRecord processors

2020-07-15 Thread Peter Turcsanyi (Jira)
Peter Turcsanyi created NIFI-7645:
-

 Summary: Implement AMQPRecord processors
 Key: NIFI-7645
 URL: https://issues.apache.org/jira/browse/NIFI-7645
 Project: Apache NiFi
  Issue Type: Improvement
Reporter: Peter Turcsanyi
Assignee: Peter Turcsanyi






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (NIFI-7628) Parameter context in multi tenant environment

2020-07-15 Thread Bryan Bende (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17158700#comment-17158700
 ] 

Bryan Bende commented on NIFI-7628:
---

You can also have read/write policies on parameter contexts, in order to bind 
one to a process group you need read permissions on the context and write 
permissions on the process group.

> Parameter context in multi tenant environment
> -
>
> Key: NIFI-7628
> URL: https://issues.apache.org/jira/browse/NIFI-7628
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Flow Versioning, SDLC
>Reporter: naveen kumar saharan
>Priority: Major
>
> In multi-tenant environment, its not possible to restrict the parameters for 
> different BU/vertical/tenant.
> Currently either you can give access(view or modify) to a PG or not.
>  
> But in multi-tenant environment we want the BU/vertical tenant specific 
> access on Param context so that people don't disturb each others work and its 
> important wrt to security and compliance also 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (NIFI-7644) New Relic reporting task

2020-07-15 Thread Ihar Hulevich (Jira)
Ihar Hulevich created NIFI-7644:
---

 Summary: New Relic reporting task
 Key: NIFI-7644
 URL: https://issues.apache.org/jira/browse/NIFI-7644
 Project: Apache NiFi
  Issue Type: New Feature
Reporter: Ihar Hulevich


Integration between NiFi and NewRelic for sending metrics, it's expecting the 
separate New Relic bundle.
The reporting task should two options for integration with New Relic: Agent and 
API.
for example how it's implemented in micrometer:
[https://github.com/micrometer-metrics/micrometer/blob/master/implementations/micrometer-registry-new-relic/src/main/java/io/micrometer/newrelic/NewRelicInsightsAgentClientProvider.java]
[https://github.com/micrometer-metrics/micrometer/blob/master/implementations/micrometer-registry-new-relic/src/main/java/io/micrometer/newrelic/NewRelicInsightsApiClientProvider.java]

The number of metrics should be not less than integration with Datadog 
[https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-datadog-bundle/nifi-datadog-reporting-task/src/main/java/org/apache/nifi/reporting/datadog/metrics/MetricNames.java#L24-L73]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (NIFI-7643) Unpackcontent writing absolute path property does not make sense

2020-07-15 Thread Jira


 [ 
https://issues.apache.org/jira/browse/NIFI-7643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tamás Bunth updated NIFI-7643:
--
Description: 
Steps to reproduce:
 # Create a tar or zip file with some arbitrary content in its root. The bug 
occurs only when using these two formats.
 # Create a flow: GetFile -> UnpackContent -> LogAttribute.
 # Set GetFile to fetch the compressed test file from the file system. Set 
UnpackContent to use the appropriate format.

Unpackcontent writes an attribute "file.absolutePath" which is currently the 
relative path of the unpacked content concatenated with the current working 
directory (where Nifi has been run).

E.g. if I unpacked a file with relative path "egg/ham.txt" and I run Nifi from 
"/usr/bin", "file.absolutePath" would be "/usr/bin/egg/ham.txt".

In my opinion, "absolute path" does not make much sense in this context. I 
suggest removing it.

  was:
Steps to reproduce:
 * Create a tar or zip file with some arbitrary content in its root. The bug 
occurs only when using these two formats.
 * Create a flow: GetFile -> UnpackContent -> LogAttribute.
 * Set GetFile to fetch the compressed test file from the file system. Set 
UnpackContent to use the appropriate format.

Unpackcontent writes an attribute "file.absolutePath" which is currently the 
relative path of the unpacked content concatenated with the current working 
directory (where Nifi has been run).

E.g. if I unpacked a file with relative path "egg/ham.txt" and I run Nifi from 
"/usr/bin", "file.absolutePath" would be "/usr/bin/egg/ham.txt".

In my opinion, "absolute path" does not make much sense in this context. I 
suggest removing it.


> Unpackcontent writing absolute path property does not make sense
> 
>
> Key: NIFI-7643
> URL: https://issues.apache.org/jira/browse/NIFI-7643
> Project: Apache NiFi
>  Issue Type: Bug
>Reporter: Tamás Bunth
>Priority: Major
>
> Steps to reproduce:
>  # Create a tar or zip file with some arbitrary content in its root. The bug 
> occurs only when using these two formats.
>  # Create a flow: GetFile -> UnpackContent -> LogAttribute.
>  # Set GetFile to fetch the compressed test file from the file system. Set 
> UnpackContent to use the appropriate format.
> Unpackcontent writes an attribute "file.absolutePath" which is currently the 
> relative path of the unpacked content concatenated with the current working 
> directory (where Nifi has been run).
> E.g. if I unpacked a file with relative path "egg/ham.txt" and I run Nifi 
> from "/usr/bin", "file.absolutePath" would be "/usr/bin/egg/ham.txt".
> In my opinion, "absolute path" does not make much sense in this context. I 
> suggest removing it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (NIFI-7643) Unpackcontent writing absolute path property does not make sense

2020-07-15 Thread Jira
Tamás Bunth created NIFI-7643:
-

 Summary: Unpackcontent writing absolute path property does not 
make sense
 Key: NIFI-7643
 URL: https://issues.apache.org/jira/browse/NIFI-7643
 Project: Apache NiFi
  Issue Type: Bug
Reporter: Tamás Bunth


Steps to reproduce:
 * Create a tar or zip file with some arbitrary content in its root. The bug 
occurs only when using these two formats.
 * Create a flow: GetFile -> UnpackContent -> LogAttribute.
 * Set GetFile to fetch the compressed test file from the file system. Set 
UnpackContent to use the appropriate format.

Unpackcontent writes an attribute "file.absolutePath" which is currently the 
relative path of the unpacked content concatenated with the current working 
directory (where Nifi has been run).

E.g. if I unpacked a file with relative path "egg/ham.txt" and I run Nifi from 
"/usr/bin", "file.absolutePath" would be "/usr/bin/egg/ham.txt".

In my opinion, "absolute path" does not make much sense in this context. I 
suggest removing it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi-minifi-cpp] arpadboda commented on pull request #836: MINIFICPP-1248 Create unit tests for the ConsumeWindowsEventLog processor

2020-07-15 Thread GitBox


arpadboda commented on pull request #836:
URL: https://github.com/apache/nifi-minifi-cpp/pull/836#issuecomment-658855469


   @fgerlits : 
   The stringutils part seems to fail on gcc:
   https://travis-ci.org/github/apache/nifi-minifi-cpp/jobs/708347373



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] arpadboda commented on pull request #821: MINIFICPP-1251 - Implement and test RetryFlowFile processor

2020-07-15 Thread GitBox


arpadboda commented on pull request #821:
URL: https://github.com/apache/nifi-minifi-cpp/pull/821#issuecomment-658816200


   @hunyadi-dev : could you rebase and push? 
   The changes look good to me, but I would like to see it build on Win without 
errors.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] goosalex opened a new pull request #4410: NIFI-7642: Improve batching into FlowFiles under high latency conditions

2020-07-15 Thread GitBox


goosalex opened a new pull request #4410:
URL: https://github.com/apache/nifi/pull/4410


   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
    Description of PR
   
   Improves receiving large batches of Kafka messages per FlowFile when network 
latency is high. fixes bug NIFI-7642
   Wires existing processor properties to:
   * Remove 1000 records "magic number"
   * Remove 10ms poll timeout "magic number"
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
in the commit message?
   
   - [x] Does your PR title start with **NIFI-** where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.
   
   - [x] Has your PR been rebased against the latest commit within the target 
branch (typically `main`)?
   
   - [x] Is your initial contribution a single, squashed commit? _Additional 
commits in response to PR reviewer feedback should be made on this branch and 
pushed to allow change tracking. Do not `squash` or use `--force` when pushing 
to allow for clean monitoring of changes._
   
   ### For code changes:
   - [x] Have you ensured that the full suite of tests is executed via `mvn 
-Pcontrib-check clean install` at the root `nifi` folder?
   - [x] Have you written or updated unit tests to verify your changes?
   - [x] Have you verified that the full build is successful on JDK 8?
   - [x] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main 
`LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main 
`NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to 
.name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which 
it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for 
build issues and submit an update to your PR as soon as possible.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] fgerlits opened a new pull request #840: MINIFICPP-1293 Fix the failing PropertyTests

2020-07-15 Thread GitBox


fgerlits opened a new pull request #840:
URL: https://github.com/apache/nifi-minifi-cpp/pull/840


   See https://issues.apache.org/jira/browse/MINIFICPP-1293 for the description 
of the bug.
   
   Description of the change: instead of using `mktime` and adjusting by the 
time zone offset, use `_mkgmtime` on Windows where it is available, and use a 
hand-written version of `mkgmtime` elsewhere.
   
   ---
   
   Thank you for submitting a contribution to Apache NiFi - MiNiFi C++.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
   
   - [ ] Does your PR title start with MINIFICPP- where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.
   
   - [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   ### For code changes:
   - [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the LICENSE file?
   - [ ] If applicable, have you updated the NOTICE file?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which 
it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] goosalex closed pull request #4409: Nifi 7642 - Wire KafkaConsumer properties instead of magic numbers to improve batching into fewer FlowFiles under high latency conditions

2020-07-15 Thread GitBox


goosalex closed pull request #4409:
URL: https://github.com/apache/nifi/pull/4409


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #833: MINIFICPP-1272 - Graceful shutdown on flow update

2020-07-15 Thread GitBox


arpadboda commented on a change in pull request #833:
URL: https://github.com/apache/nifi-minifi-cpp/pull/833#discussion_r455083609



##
File path: libminifi/src/core/ProcessGroup.cpp
##
@@ -412,6 +414,18 @@ void ProcessGroup::drainConnections() {
   }
 }
 
+std::size_t ProcessGroup::getTotalFlowFileCount() const {
+  std::size_t sum = 0;
+  for (auto& conn : connections_) {

Review comment:
   Fair point, not sure which option is better, so feel free to choose. You 
can also mark this thread resolved. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #833: MINIFICPP-1272 - Graceful shutdown on flow update

2020-07-15 Thread GitBox


arpadboda commented on a change in pull request #833:
URL: https://github.com/apache/nifi-minifi-cpp/pull/833#discussion_r455083106



##
File path: libminifi/test/flow-tests/FlowControllerTests.cpp
##
@@ -119,7 +107,125 @@ TEST_CASE("Flow shutdown drains connections", 
"[TestFlow1]") {
 
   controller->stop(true);
 
+  REQUIRE(sinkProc->trigger_count == 0);
+
   for (auto& it : connectionMap) {
 REQUIRE(it.second->isEmpty());
   }
 }
+
+TEST_CASE("Flow shutdown waits for a while", "[TestFlow2]") {
+  TestControllerWithFlow testController(yamlConfig);
+  auto controller = testController.controller_;
+  auto root = testController.root_;
+
+  auto sourceProc = 
std::static_pointer_cast(root->findProcessor("Generator"));
+  auto sinkProc = 
std::static_pointer_cast(root->findProcessor("TestProcessor"));
+
+  // prevent the initial trigger
+  // in case the source got triggered
+  // and the scheduler triggers the sink
+  // before we could initiate the shutdown
+  sinkProc->yield(100);
+
+  testController.startFlow();
+
+  // wait for the source processor to enqueue its flowFiles
+  std::this_thread::sleep_for(std::chrono::milliseconds{50});
+
+  REQUIRE(sourceProc->trigger_count.load() == 1);
+  REQUIRE(sinkProc->trigger_count.load() == 0);
+
+  controller->stop(true);
+
+  REQUIRE(sourceProc->trigger_count.load() == 1);
+  REQUIRE(sinkProc->trigger_count.load() == 3);
+}
+
+TEST_CASE("Flow stopped after grace period", "[TestFlow3]") {
+  TestControllerWithFlow testController(yamlConfig);
+  auto controller = testController.controller_;
+  auto root = testController.root_;
+
+  
testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout,
 "1000 ms");
+
+  auto sourceProc = 
std::static_pointer_cast(root->findProcessor("Generator"));
+  auto sinkProc = 
std::static_pointer_cast(root->findProcessor("TestProcessor"));
+
+  // prevent the initial trigger
+  // in case the source got triggered
+  // and the scheduler triggers the sink
+  sinkProc->yield(100);
+
+  sinkProc->onTriggerCb_ = [&]{
+static std::atomic first_onTrigger{true};
+bool isFirst = true;
+// sleep only on the first trigger
+if (first_onTrigger.compare_exchange_strong(isFirst, false)) {
+  std::this_thread::sleep_for(std::chrono::milliseconds{1500});
+}
+  };
+
+  testController.startFlow();
+
+  // wait for the source processor to enqueue its flowFiles
+  std::this_thread::sleep_for(std::chrono::milliseconds{50});

Review comment:
   Sounds good to me, thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi] goosalex opened a new pull request #4409: Nifi 7642 - Wire KafkaConsumer properties instead of magic numbers to improve batching into fewer FlowFiles under high latency conditions

2020-07-15 Thread GitBox


goosalex opened a new pull request #4409:
URL: https://github.com/apache/nifi/pull/4409


   Thank you for submitting a contribution to Apache NiFi.
   
   Please provide a short description of the PR here:
   
    Description of PR
   
   Improves receiving large batches of Kafka messages per FlowFile when network 
latency is high. fixes bug NIFI-7642
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
in the commit message?
   
   - [x] Does your PR title start with **NIFI-** where  is the JIRA 
number you are trying to resolve? Pay particular attention to the hyphen "-" 
character.
   
   - [x] Has your PR been rebased against the latest commit within the target 
branch (typically `main`)?
   
   - [x] Is your initial contribution a single, squashed commit? _Additional 
commits in response to PR reviewer feedback should be made on this branch and 
pushed to allow change tracking. Do not `squash` or use `--force` when pushing 
to allow for clean monitoring of changes._
   
   ### For code changes:
   - [x] Have you ensured that the full suite of tests is executed via `mvn 
-Pcontrib-check clean install` at the root `nifi` folder?
   - [x] Have you written or updated unit tests to verify your changes?
   - [x] Have you verified that the full build is successful on JDK 8?
   - [x] Have you verified that the full build is successful on JDK 11?
   - [-] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [-] If applicable, have you updated the `LICENSE` file, including the main 
`LICENSE` file under `nifi-assembly`?
   - [-] If applicable, have you updated the `NOTICE` file, including the main 
`NOTICE` file found under `nifi-assembly`?
   - [-] If adding new Properties, have you added `.displayName` in addition to 
.name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [-] Have you ensured that format looks appropriate for the output in which 
it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for 
build issues and submit an update to your PR as soon as possible.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (NIFI-7628) Parameter context in multi tenant environment

2020-07-15 Thread Pierre Villard (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierre Villard updated NIFI-7628:
-
Component/s: SDLC
 Flow Versioning

> Parameter context in multi tenant environment
> -
>
> Key: NIFI-7628
> URL: https://issues.apache.org/jira/browse/NIFI-7628
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Flow Versioning, SDLC
>Reporter: naveen kumar saharan
>Priority: Major
>
> In multi-tenant environment, its not possible to restrict the parameters for 
> different BU/vertical/tenant.
> Currently either you can give access(view or modify) to a PG or not.
>  
> But in multi-tenant environment we want the BU/vertical tenant specific 
> access on Param context so that people don't disturb each others work and its 
> important wrt to security and compliance also 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (NIFI-7628) Parameter context in multi tenant environment

2020-07-15 Thread Pierre Villard (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17158178#comment-17158178
 ] 

Pierre Villard commented on NIFI-7628:
--

Not sure to understand the request here. A parameter context is attached to a 
process group and you can have a process group per BU / tenant. What is the 
issue? What would be the expected behavior?

> Parameter context in multi tenant environment
> -
>
> Key: NIFI-7628
> URL: https://issues.apache.org/jira/browse/NIFI-7628
> Project: Apache NiFi
>  Issue Type: Improvement
>  Components: Flow Versioning, SDLC
>Reporter: naveen kumar saharan
>Priority: Major
>
> In multi-tenant environment, its not possible to restrict the parameters for 
> different BU/vertical/tenant.
> Currently either you can give access(view or modify) to a PG or not.
>  
> But in multi-tenant environment we want the BU/vertical tenant specific 
> access on Param context so that people don't disturb each others work and its 
> important wrt to security and compliance also 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (NIFI-7628) Parameter context in multi tenant environment

2020-07-15 Thread Pierre Villard (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierre Villard updated NIFI-7628:
-
Issue Type: Improvement  (was: Bug)

> Parameter context in multi tenant environment
> -
>
> Key: NIFI-7628
> URL: https://issues.apache.org/jira/browse/NIFI-7628
> Project: Apache NiFi
>  Issue Type: Improvement
>Reporter: naveen kumar saharan
>Priority: Major
>
> In multi-tenant environment, its not possible to restrict the parameters for 
> different BU/vertical/tenant.
> Currently either you can give access(view or modify) to a PG or not.
>  
> But in multi-tenant environment we want the BU/vertical tenant specific 
> access on Param context so that people don't disturb each others work and its 
> important wrt to security and compliance also 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi-minifi-cpp] arpadboda closed pull request #838: MINIFICPP-1183 Fix Windows compile errors

2020-07-15 Thread GitBox


arpadboda closed pull request #838:
URL: https://github.com/apache/nifi-minifi-cpp/pull/838


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (NIFIREG-344) ClientUtils reads "Content-Disposition" before checking status code

2020-07-15 Thread Bryan Bende (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFIREG-344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bryan Bende updated NIFIREG-344:

Fix Version/s: (was: 0.7.0)

> ClientUtils reads "Content-Disposition" before checking status code
> ---
>
> Key: NIFIREG-344
> URL: https://issues.apache.org/jira/browse/NIFIREG-344
> Project: NiFi Registry
>  Issue Type: Bug
>Affects Versions: 0.5.0
>Reporter: Bryan Bende
>Assignee: Bryan Bende
>Priority: Minor
>
> The issue is that if the response something other than 200, the code still 
> tries to get the Content-Disposition header which won't be there, and then 
> throws IllegalStateException, but really there should have been a specific 
> exception for things like a 404.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (NIFIREG-292) Add database implementations of UserGroupProvider and AccessPolicyProvider

2020-07-15 Thread Bryan Bende (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFIREG-292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bryan Bende updated NIFIREG-292:

Fix Version/s: (was: 0.7.0)

> Add database implementations of UserGroupProvider and AccessPolicyProvider
> --
>
> Key: NIFIREG-292
> URL: https://issues.apache.org/jira/browse/NIFIREG-292
> Project: NiFi Registry
>  Issue Type: Improvement
>Reporter: Bryan Bende
>Assignee: Bryan Bende
>Priority: Major
>  Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> We should offer database backed implementations of UserGroupProvider and 
> AccessPolicyProvider as an alternative to the file-based impls. We have LDAP 
> and Ranger alternatives, but for people not using those, the DB impls would 
> be a good way to get the data off the local filesystem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (NIFIREG-321) Integrate nifi-registry-revsion into REST API and service layers

2020-07-15 Thread Bryan Bende (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFIREG-321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bryan Bende resolved NIFIREG-321.
-
Resolution: Fixed

> Integrate nifi-registry-revsion into REST API and service layers
> 
>
> Key: NIFIREG-321
> URL: https://issues.apache.org/jira/browse/NIFIREG-321
> Project: NiFi Registry
>  Issue Type: Improvement
>Reporter: Bryan Bende
>Assignee: Bryan Bende
>Priority: Major
> Fix For: 0.7.0
>
>  Time Spent: 3h
>  Remaining Estimate: 0h
>
> In NIFIREG-300 we setup the revision modules which represent a library for 
> implementing optimistic locking using the same approach NiFi currently 
> implements.
> This ticket is the follow on work to now integrate the revision concept into 
> NiFi Registry's REST API and service layer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (NIFIREG-404) Perform release manager activities for 0.7.0 release

2020-07-15 Thread Bryan Bende (Jira)
Bryan Bende created NIFIREG-404:
---

 Summary: Perform release manager activities for 0.7.0 release
 Key: NIFIREG-404
 URL: https://issues.apache.org/jira/browse/NIFIREG-404
 Project: NiFi Registry
  Issue Type: Task
Reporter: Bryan Bende
Assignee: Bryan Bende
 Fix For: 0.7.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (NIFIREG-381) Skip integration tests in Github CI/Actions

2020-07-15 Thread Bryan Bende (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFIREG-381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bryan Bende resolved NIFIREG-381.
-
Resolution: Fixed

> Skip integration tests in Github CI/Actions
> ---
>
> Key: NIFIREG-381
> URL: https://issues.apache.org/jira/browse/NIFIREG-381
> Project: NiFi Registry
>  Issue Type: Task
>Reporter: Joe Witt
>Assignee: Joe Witt
>Priority: Major
> Fix For: 0.7.0
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (NIFI-7642) KafkaConsumers: Improve batching into fewer FlowFiles under high latency conditions

2020-07-15 Thread Alex Goos (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-7642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Goos updated NIFI-7642:

Summary: KafkaConsumers: Improve batching into fewer FlowFiles under high 
latency conditions  (was: KafkaConsumers: Improve batching into fewer FlowFiles 
under high latency conditios)

> KafkaConsumers: Improve batching into fewer FlowFiles under high latency 
> conditions
> ---
>
> Key: NIFI-7642
> URL: https://issues.apache.org/jira/browse/NIFI-7642
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.3.0, 1.4.0, 1.5.0, 1.6.0, 1.7.0, 1.8.0, 1.7.1, 1.9.0, 
> 1.10.0, 1.9.1, 1.9.2, 1.11.0, 1.11.1, 1.11.2, 1.11.4
>Reporter: Alex Goos
>Priority: Minor
>  Labels: kafka
>
> NIFI-3962 introduced two hard-coded magic numbers into KafkaConsumers. :
>  * maxRecords is capped at 1000, regardless of the property setting
>  * the poll timeout is fixed at 10ms
> Under high throughput & high latency conditions, this leads to too small 
> small files.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

2020-07-15 Thread GitBox


adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r455004287



##
File path: libminifi/src/core/ProcessSession.cpp
##
@@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source, 
std::vector buffer(getpagesize());
   try {
-try {
-  std::ifstream input{source, std::ios::in | std::ios::binary};
-  logger_->log_debug("Opening %s", source);
-  if (!input.is_open() || !input.good()) {
-throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: failed to open file \'", 
source, "\'"));
+std::ifstream input{source, std::ios::in | std::ios::binary};
+logger_->log_debug("Opening %s", source);
+if (!input.is_open() || !input.good()) {
+  throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: failed to open file \'", 
source, "\'"));
+}
+if (offset != 0U) {
+  input.seekg(offset, std::ifstream::beg);
+  if (!input.good()) {
+logger_->log_error("Seeking to %lu failed for file %s (does 
file/filesystem support seeking?)", offset, source);
+throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", 
std::to_string(offset)));
   }
-  if (offset != 0U) {
-input.seekg(offset, std::ifstream::beg);
-if (!input.good()) {
-  logger_->log_error("Seeking to %lu failed for file %s (does 
file/filesystem support seeking?)", offset, source);
-  throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", 
std::to_string(offset)));
-}
+}
+uint64_t startTime = 0U;
+while (input.good()) {
+  input.read(reinterpret_cast(buffer.data()), buffer.size());
+  std::streamsize read = input.gcount();
+  if (read < 0) {
+throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount 
returned negative value");
   }
-  uint64_t startTime = 0U;
-  while (input.good()) {
-input.read(reinterpret_cast(buffer.data()), buffer.size());
-std::streamsize read = input.gcount();
-if (read < 0) {
-  throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount 
returned negative value");
-}
-if (read == 0) {
-  logger_->log_trace("Finished reading input %s", source);
+  if (read == 0) {
+logger_->log_trace("Finished reading input %s", source);
+break;
+  } else {
+logging::LOG_TRACE(logger_) << "Read input of " << read;
+  }
+  uint8_t* begin = buffer.data();
+  uint8_t* end = begin + read;
+  while (true) {
+startTime = getTimeMillis();
+uint8_t* delimiterPos = std::find(begin, end, 
static_cast(inputDelimiter));
+const auto len = gsl::narrow(delimiterPos - begin);
+
+logging::LOG_TRACE(logger_) << "Read input of " << read << " length is 
" << len << " is at end?" << (delimiterPos == end);
+/*
+ * We do not want to process the rest of the buffer after the last 
delimiter if
+ *  - we have reached EOF in the file (we would discard it anyway)
+ *  - there is nothing to process (the last character in the buffer is 
a delimiter)
+ */
+if (delimiterPos == end && (input.eof() || len == 0)) {
   break;
-} else {
-  logging::LOG_TRACE(logger_) << "Read input of " << read;
 }
-uint8_t* begin = buffer.data();
-uint8_t* end = begin + read;
-while (true) {
-  startTime = getTimeMillis();
-  uint8_t* delimiterPos = std::find(begin, end, 
static_cast(inputDelimiter));
-  const auto len = gsl::narrow(delimiterPos - begin);
-
-  logging::LOG_TRACE(logger_) << "Read input of " << read << " length 
is " << len << " is at end?" << (delimiterPos == end);
-  /*
-   * We do not want to process the rest of the buffer after the last 
delimiter if
-   *  - we have reached EOF in the file (we would discard it anyway)
-   *  - there is nothing to process (the last character in the buffer 
is a delimiter)
-   */
-  if (delimiterPos == end && (input.eof() || len == 0)) {
-break;
-  }
-
-  /* Create claim and stream if needed and append data */
-  if (claim == nullptr) {
-startTime = getTimeMillis();
-claim = 
std::make_shared(process_context_->getContentRepository());
-  }
-  if (stream == nullptr) {
-stream = process_context_->getContentRepository()->write(claim);
-  }
-  if (stream == nullptr) {
-logger_->log_error("Stream is null");
-rollback();
-return;
-  }
-  if (stream->write(begin, len) != len) {
-logger_->log_error("Error while writ

[jira] [Created] (MINIFICPP-1294) Move stream ref counting to the ResourceClaim class

2020-07-15 Thread Adam Debreceni (Jira)
Adam Debreceni created MINIFICPP-1294:
-

 Summary: Move stream ref counting to the ResourceClaim class
 Key: MINIFICPP-1294
 URL: https://issues.apache.org/jira/browse/MINIFICPP-1294
 Project: Apache NiFi MiNiFi C++
  Issue Type: Improvement
Reporter: Adam Debreceni
 Fix For: 1.0.0


Currently the FlowFile and others manually increment/decrement the refCount of 
each ResourceClaim. As each ResourceClaim also has a reference to the content 
repository (in form a StreamManager) it should be able to notify the manager if 
a ResourceClaim instance is created/destroyed.

The only place requiring manual refCount adjustments should be when we 
Serialize/Deserialize the FlowFiles.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

2020-07-15 Thread GitBox


adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r455004287



##
File path: libminifi/src/core/ProcessSession.cpp
##
@@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source, 
std::vector buffer(getpagesize());
   try {
-try {
-  std::ifstream input{source, std::ios::in | std::ios::binary};
-  logger_->log_debug("Opening %s", source);
-  if (!input.is_open() || !input.good()) {
-throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: failed to open file \'", 
source, "\'"));
+std::ifstream input{source, std::ios::in | std::ios::binary};
+logger_->log_debug("Opening %s", source);
+if (!input.is_open() || !input.good()) {
+  throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: failed to open file \'", 
source, "\'"));
+}
+if (offset != 0U) {
+  input.seekg(offset, std::ifstream::beg);
+  if (!input.good()) {
+logger_->log_error("Seeking to %lu failed for file %s (does 
file/filesystem support seeking?)", offset, source);
+throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", 
std::to_string(offset)));
   }
-  if (offset != 0U) {
-input.seekg(offset, std::ifstream::beg);
-if (!input.good()) {
-  logger_->log_error("Seeking to %lu failed for file %s (does 
file/filesystem support seeking?)", offset, source);
-  throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", 
std::to_string(offset)));
-}
+}
+uint64_t startTime = 0U;
+while (input.good()) {
+  input.read(reinterpret_cast(buffer.data()), buffer.size());
+  std::streamsize read = input.gcount();
+  if (read < 0) {
+throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount 
returned negative value");
   }
-  uint64_t startTime = 0U;
-  while (input.good()) {
-input.read(reinterpret_cast(buffer.data()), buffer.size());
-std::streamsize read = input.gcount();
-if (read < 0) {
-  throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount 
returned negative value");
-}
-if (read == 0) {
-  logger_->log_trace("Finished reading input %s", source);
+  if (read == 0) {
+logger_->log_trace("Finished reading input %s", source);
+break;
+  } else {
+logging::LOG_TRACE(logger_) << "Read input of " << read;
+  }
+  uint8_t* begin = buffer.data();
+  uint8_t* end = begin + read;
+  while (true) {
+startTime = getTimeMillis();
+uint8_t* delimiterPos = std::find(begin, end, 
static_cast(inputDelimiter));
+const auto len = gsl::narrow(delimiterPos - begin);
+
+logging::LOG_TRACE(logger_) << "Read input of " << read << " length is 
" << len << " is at end?" << (delimiterPos == end);
+/*
+ * We do not want to process the rest of the buffer after the last 
delimiter if
+ *  - we have reached EOF in the file (we would discard it anyway)
+ *  - there is nothing to process (the last character in the buffer is 
a delimiter)
+ */
+if (delimiterPos == end && (input.eof() || len == 0)) {
   break;
-} else {
-  logging::LOG_TRACE(logger_) << "Read input of " << read;
 }
-uint8_t* begin = buffer.data();
-uint8_t* end = begin + read;
-while (true) {
-  startTime = getTimeMillis();
-  uint8_t* delimiterPos = std::find(begin, end, 
static_cast(inputDelimiter));
-  const auto len = gsl::narrow(delimiterPos - begin);
-
-  logging::LOG_TRACE(logger_) << "Read input of " << read << " length 
is " << len << " is at end?" << (delimiterPos == end);
-  /*
-   * We do not want to process the rest of the buffer after the last 
delimiter if
-   *  - we have reached EOF in the file (we would discard it anyway)
-   *  - there is nothing to process (the last character in the buffer 
is a delimiter)
-   */
-  if (delimiterPos == end && (input.eof() || len == 0)) {
-break;
-  }
-
-  /* Create claim and stream if needed and append data */
-  if (claim == nullptr) {
-startTime = getTimeMillis();
-claim = 
std::make_shared(process_context_->getContentRepository());
-  }
-  if (stream == nullptr) {
-stream = process_context_->getContentRepository()->write(claim);
-  }
-  if (stream == nullptr) {
-logger_->log_error("Stream is null");
-rollback();
-return;
-  }
-  if (stream->write(begin, len) != len) {
-logger_->log_error("Error while writ

[jira] [Updated] (NIFI-7642) KafkaConsumers: Improve batching into fewer FlowFiles under high latency conditios

2020-07-15 Thread Alex Goos (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-7642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Goos updated NIFI-7642:

Labels: kafka  (was: )

> KafkaConsumers: Improve batching into fewer FlowFiles under high latency 
> conditios
> --
>
> Key: NIFI-7642
> URL: https://issues.apache.org/jira/browse/NIFI-7642
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Affects Versions: 1.3.0, 1.4.0, 1.5.0, 1.6.0, 1.7.0, 1.8.0, 1.7.1, 1.9.0, 
> 1.10.0, 1.9.1, 1.9.2, 1.11.0, 1.11.1, 1.11.2, 1.11.4
>Reporter: Alex Goos
>Priority: Minor
>  Labels: kafka
>
> NIFI-3962 introduced two hard-coded magic numbers into KafkaConsumers. :
>  * maxRecords is capped at 1000, regardless of the property setting
>  * the poll timeout is fixed at 10ms
> Under high throughput & high latency conditions, this leads to too small 
> small files.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (NIFI-7642) KafkaConsumers: Improve batching into fewer FlowFiles under high latency conditios

2020-07-15 Thread Alex Goos (Jira)
Alex Goos created NIFI-7642:
---

 Summary: KafkaConsumers: Improve batching into fewer FlowFiles 
under high latency conditios
 Key: NIFI-7642
 URL: https://issues.apache.org/jira/browse/NIFI-7642
 Project: Apache NiFi
  Issue Type: Bug
  Components: Extensions
Affects Versions: 1.11.4, 1.11.2, 1.11.1, 1.11.0, 1.9.2, 1.9.1, 1.10.0, 
1.9.0, 1.7.1, 1.8.0, 1.7.0, 1.6.0, 1.5.0, 1.4.0, 1.3.0
Reporter: Alex Goos


NIFI-3962 introduced two hard-coded magic numbers into KafkaConsumers. :
 * maxRecords is capped at 1000, regardless of the property setting
 * the poll timeout is fixed at 10ms

Under high throughput & high latency conditions, this leads to too small small 
files.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (NIFI-7641) NullPointerException with ExecuteFlumeSource

2020-07-15 Thread Pierre Villard (Jira)
Pierre Villard created NIFI-7641:


 Summary: NullPointerException with ExecuteFlumeSource
 Key: NIFI-7641
 URL: https://issues.apache.org/jira/browse/NIFI-7641
 Project: Apache NiFi
  Issue Type: Bug
  Components: Extensions
Reporter: Pierre Villard


Faced a NPE when using the ExecuteFlumeSource processor. Seems to be when the 
processor couldn't bind to the port as another processor was already using it.
{code:java}
2020-07-14 11:40:54,373 INFO org.apache.flume.source.DefaultSourceFactory: 
Creating instance of source src-1, type avro
2020-07-14 11:40:54,374 INFO 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent: Scheduled 
ExecuteFlumeSource[id=b8103a4f-871d-12f4-8530-997105f8c85d] to run with 1 
threads
2020-07-14 11:40:54,374 INFO org.apache.flume.source.AvroSource: Avro source 
src-1 stopping: Avro source src-1: { bindAddress: 0.0.0.0, port: 50100 }
2020-07-14 11:40:54,375 ERROR 
org.apache.nifi.processors.flume.ExecuteFlumeSource: 
ExecuteFlumeSource[id=b8103a4f-871d-12f4-8530-997105f8c85d] 
ExecuteFlumeSource[id=b8103a4f-871d-12f4-8530-997105f8c85d] failed to process 
session due to java.lang.NullPointerException; Processor Administratively 
Yielded for 1 sec: java.lang.NullPointerException
java.lang.NullPointerException: null
at org.apache.flume.source.AvroSource.stop(AvroSource.java:302)
at 
org.apache.flume.source.EventDrivenSourceRunner.stop(EventDrivenSourceRunner.java:51)
at 
org.apache.nifi.processors.flume.ExecuteFlumeSource.stopped(ExecuteFlumeSource.java:158)
at 
org.apache.nifi.processors.flume.ExecuteFlumeSource.onTrigger(ExecuteFlumeSource.java:179)
at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
at 
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-07-14 11:40:54,375 WARN org.apache.nifi.controller.tasks.ConnectableTask: 
Administratively Yielding 
ExecuteFlumeSource[id=b8103a4f-871d-12f4-8530-997105f8c85d] due to uncaught 
Exception: java.lang.NullPointerException
java.lang.NullPointerException: null
at org.apache.flume.source.AvroSource.stop(AvroSource.java:302)
at 
org.apache.flume.source.EventDrivenSourceRunner.stop(EventDrivenSourceRunner.java:51)
at 
org.apache.nifi.processors.flume.ExecuteFlumeSource.stopped(ExecuteFlumeSource.java:158)
at 
org.apache.nifi.processors.flume.ExecuteFlumeSource.onTrigger(ExecuteFlumeSource.java:179)
at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
at 
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-07-14 11:40:54,898 INFO org.apache.nifi.controller.StandardFlowService: 
Saved flow controller org.apache.nifi.controller.FlowController@3a534280 // 
Another save pending = false
2020-07-14 11:40:55,991 INFO 
org.apache.nifi.controller.scheduling.StandardProcessScheduler: Starting 
ExecuteFlumeSource[id=21e134ef-a08b-1570-89a5-064680f551a0]
2020-07-14 11:40:55,991 INFO org.apache.nifi.controller.StandardProcessorNode: 
Starting ExecuteFlumeSource[id=21e134ef-a08b-1570-89a5-064680f551a0]
{code}



--
This message w

[jira] [Created] (NIFI-7640) HandleHttpRequest - define temporary files location

2020-07-15 Thread Pierre Villard (Jira)
Pierre Villard created NIFI-7640:


 Summary: HandleHttpRequest - define temporary files location
 Key: NIFI-7640
 URL: https://issues.apache.org/jira/browse/NIFI-7640
 Project: Apache NiFi
  Issue Type: Improvement
  Components: Extensions
Reporter: Pierre Villard


A property should be added to the processor allowing users to define where 
temporary files are located.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] KuKuDeCheng commented on pull request #4326: NIFI-7519:Add a ExecuteSQL property on DBCPConnectionPool

2020-07-15 Thread GitBox


KuKuDeCheng commented on pull request #4326:
URL: https://github.com/apache/nifi/pull/4326#issuecomment-658667516


   > I'm not change about github checks error.
   > I run this check successful on my compute.
   
   You need update DatabaseRecordSinkTest.java.   Add your ExecuteSQL property 
like this:
   `
   when(dbContext.getProperty(DATABASE_URL)).thenReturn(new 
MockPropertyValue("jdbc:derby:${DB_LOCATION}"))
   `



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (NIFI-7635) Fix StandardConfigurationContext to read the default property value from its ComponentNode instance

2020-07-15 Thread Peter Gyori (Jira)


 [ 
https://issues.apache.org/jira/browse/NIFI-7635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Gyori updated NIFI-7635:
--
Resolution: Fixed
Status: Resolved  (was: Patch Available)

> Fix StandardConfigurationContext to read the default property value from its 
> ComponentNode instance
> ---
>
> Key: NIFI-7635
> URL: https://issues.apache.org/jira/browse/NIFI-7635
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Core Framework
>Reporter: Peter Gyori
>Assignee: Peter Gyori
>Priority: Major
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> If an effective property value is not set, 
> StandardConfigurationContext.getProperty(PropertyDescriptor property) reads 
> the default property value from its parameter (property) instead of the 
> ComponentNode instance. This results in faulty behavior when e.g. a 
> controller service is enabled with its default settings and one or more of 
> these default values differ from the default values of the supplied 
> PropertyDescriptor.
> A fix needs to be implemented to get the property descriptor and its default 
> value from the component itself and not the supplied PropertyDescriptor 
> parameter.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi] pgyori commented on pull request #4408: NIFI-7635: StandardConfigurationContext.getProperty() gets the proper…

2020-07-15 Thread GitBox


pgyori commented on pull request #4408:
URL: https://github.com/apache/nifi/pull/4408#issuecomment-658628455


   Thank you, @markap14 !



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

2020-07-15 Thread GitBox


adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454874069



##
File path: libminifi/src/core/ProcessSession.cpp
##
@@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source, 
std::vector buffer(getpagesize());
   try {
-try {
-  std::ifstream input{source, std::ios::in | std::ios::binary};
-  logger_->log_debug("Opening %s", source);
-  if (!input.is_open() || !input.good()) {
-throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: failed to open file \'", 
source, "\'"));
+std::ifstream input{source, std::ios::in | std::ios::binary};
+logger_->log_debug("Opening %s", source);
+if (!input.is_open() || !input.good()) {
+  throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: failed to open file \'", 
source, "\'"));
+}
+if (offset != 0U) {
+  input.seekg(offset, std::ifstream::beg);
+  if (!input.good()) {
+logger_->log_error("Seeking to %lu failed for file %s (does 
file/filesystem support seeking?)", offset, source);
+throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", 
std::to_string(offset)));
   }
-  if (offset != 0U) {
-input.seekg(offset, std::ifstream::beg);
-if (!input.good()) {
-  logger_->log_error("Seeking to %lu failed for file %s (does 
file/filesystem support seeking?)", offset, source);
-  throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", 
std::to_string(offset)));
-}
+}
+uint64_t startTime = 0U;
+while (input.good()) {
+  input.read(reinterpret_cast(buffer.data()), buffer.size());
+  std::streamsize read = input.gcount();
+  if (read < 0) {
+throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount 
returned negative value");
   }
-  uint64_t startTime = 0U;
-  while (input.good()) {
-input.read(reinterpret_cast(buffer.data()), buffer.size());
-std::streamsize read = input.gcount();
-if (read < 0) {
-  throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount 
returned negative value");
-}
-if (read == 0) {
-  logger_->log_trace("Finished reading input %s", source);
+  if (read == 0) {
+logger_->log_trace("Finished reading input %s", source);
+break;
+  } else {
+logging::LOG_TRACE(logger_) << "Read input of " << read;
+  }
+  uint8_t* begin = buffer.data();
+  uint8_t* end = begin + read;
+  while (true) {
+startTime = getTimeMillis();
+uint8_t* delimiterPos = std::find(begin, end, 
static_cast(inputDelimiter));
+const auto len = gsl::narrow(delimiterPos - begin);
+
+logging::LOG_TRACE(logger_) << "Read input of " << read << " length is 
" << len << " is at end?" << (delimiterPos == end);
+/*
+ * We do not want to process the rest of the buffer after the last 
delimiter if
+ *  - we have reached EOF in the file (we would discard it anyway)
+ *  - there is nothing to process (the last character in the buffer is 
a delimiter)
+ */
+if (delimiterPos == end && (input.eof() || len == 0)) {
   break;
-} else {
-  logging::LOG_TRACE(logger_) << "Read input of " << read;
 }
-uint8_t* begin = buffer.data();
-uint8_t* end = begin + read;
-while (true) {
-  startTime = getTimeMillis();
-  uint8_t* delimiterPos = std::find(begin, end, 
static_cast(inputDelimiter));
-  const auto len = gsl::narrow(delimiterPos - begin);
-
-  logging::LOG_TRACE(logger_) << "Read input of " << read << " length 
is " << len << " is at end?" << (delimiterPos == end);
-  /*
-   * We do not want to process the rest of the buffer after the last 
delimiter if
-   *  - we have reached EOF in the file (we would discard it anyway)
-   *  - there is nothing to process (the last character in the buffer 
is a delimiter)
-   */
-  if (delimiterPos == end && (input.eof() || len == 0)) {
-break;
-  }
-
-  /* Create claim and stream if needed and append data */
-  if (claim == nullptr) {
-startTime = getTimeMillis();
-claim = 
std::make_shared(process_context_->getContentRepository());
-  }
-  if (stream == nullptr) {
-stream = process_context_->getContentRepository()->write(claim);
-  }
-  if (stream == nullptr) {
-logger_->log_error("Stream is null");
-rollback();
-return;
-  }
-  if (stream->write(begin, len) != len) {
-logger_->log_error("Error while writ

[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

2020-07-15 Thread GitBox


adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454874069



##
File path: libminifi/src/core/ProcessSession.cpp
##
@@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source, 
std::vector buffer(getpagesize());
   try {
-try {
-  std::ifstream input{source, std::ios::in | std::ios::binary};
-  logger_->log_debug("Opening %s", source);
-  if (!input.is_open() || !input.good()) {
-throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: failed to open file \'", 
source, "\'"));
+std::ifstream input{source, std::ios::in | std::ios::binary};
+logger_->log_debug("Opening %s", source);
+if (!input.is_open() || !input.good()) {
+  throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: failed to open file \'", 
source, "\'"));
+}
+if (offset != 0U) {
+  input.seekg(offset, std::ifstream::beg);
+  if (!input.good()) {
+logger_->log_error("Seeking to %lu failed for file %s (does 
file/filesystem support seeking?)", offset, source);
+throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", 
std::to_string(offset)));
   }
-  if (offset != 0U) {
-input.seekg(offset, std::ifstream::beg);
-if (!input.good()) {
-  logger_->log_error("Seeking to %lu failed for file %s (does 
file/filesystem support seeking?)", offset, source);
-  throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", 
std::to_string(offset)));
-}
+}
+uint64_t startTime = 0U;
+while (input.good()) {
+  input.read(reinterpret_cast(buffer.data()), buffer.size());
+  std::streamsize read = input.gcount();
+  if (read < 0) {
+throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount 
returned negative value");
   }
-  uint64_t startTime = 0U;
-  while (input.good()) {
-input.read(reinterpret_cast(buffer.data()), buffer.size());
-std::streamsize read = input.gcount();
-if (read < 0) {
-  throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount 
returned negative value");
-}
-if (read == 0) {
-  logger_->log_trace("Finished reading input %s", source);
+  if (read == 0) {
+logger_->log_trace("Finished reading input %s", source);
+break;
+  } else {
+logging::LOG_TRACE(logger_) << "Read input of " << read;
+  }
+  uint8_t* begin = buffer.data();
+  uint8_t* end = begin + read;
+  while (true) {
+startTime = getTimeMillis();
+uint8_t* delimiterPos = std::find(begin, end, 
static_cast(inputDelimiter));
+const auto len = gsl::narrow(delimiterPos - begin);
+
+logging::LOG_TRACE(logger_) << "Read input of " << read << " length is 
" << len << " is at end?" << (delimiterPos == end);
+/*
+ * We do not want to process the rest of the buffer after the last 
delimiter if
+ *  - we have reached EOF in the file (we would discard it anyway)
+ *  - there is nothing to process (the last character in the buffer is 
a delimiter)
+ */
+if (delimiterPos == end && (input.eof() || len == 0)) {
   break;
-} else {
-  logging::LOG_TRACE(logger_) << "Read input of " << read;
 }
-uint8_t* begin = buffer.data();
-uint8_t* end = begin + read;
-while (true) {
-  startTime = getTimeMillis();
-  uint8_t* delimiterPos = std::find(begin, end, 
static_cast(inputDelimiter));
-  const auto len = gsl::narrow(delimiterPos - begin);
-
-  logging::LOG_TRACE(logger_) << "Read input of " << read << " length 
is " << len << " is at end?" << (delimiterPos == end);
-  /*
-   * We do not want to process the rest of the buffer after the last 
delimiter if
-   *  - we have reached EOF in the file (we would discard it anyway)
-   *  - there is nothing to process (the last character in the buffer 
is a delimiter)
-   */
-  if (delimiterPos == end && (input.eof() || len == 0)) {
-break;
-  }
-
-  /* Create claim and stream if needed and append data */
-  if (claim == nullptr) {
-startTime = getTimeMillis();
-claim = 
std::make_shared(process_context_->getContentRepository());
-  }
-  if (stream == nullptr) {
-stream = process_context_->getContentRepository()->write(claim);
-  }
-  if (stream == nullptr) {
-logger_->log_error("Stream is null");
-rollback();
-return;
-  }
-  if (stream->write(begin, len) != len) {
-logger_->log_error("Error while writ

[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

2020-07-15 Thread GitBox


adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454871893



##
File path: extensions/rocksdb-repos/FlowFileRepository.cpp
##
@@ -148,22 +148,27 @@ void FlowFileRepository::prune_stored_flowfiles() {
 std::string key = it->key().ToString();
 if (eventRead->DeSerialize(reinterpret_cast(it->value().data()), it->value().size())) {
   logger_->log_debug("Found connection for %s, path %s ", 
eventRead->getConnectionUuid(), eventRead->getContentFullPath());
-  auto search = connectionMap.find(eventRead->getConnectionUuid());
-  if (!corrupt_checkpoint && search != connectionMap.end()) {
+  bool found = false;
+  auto search = containers.find(eventRead->getConnectionUuid());
+  found = (search != containers.end());
+  if (!found) {
+// for backward compatibility
+search = connectionMap.find(eventRead->getConnectionUuid());
+found = (search != connectionMap.end());
+  }

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (NIFI-6061) PutDatabaseRecord does not properly handle BLOB/CLOB fields

2020-07-15 Thread ZhangCheng (Jira)


[ 
https://issues.apache.org/jira/browse/NIFI-6061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17157981#comment-17157981
 ] 

ZhangCheng commented on NIFI-6061:
--

For Oracle TIMESTAMP field, when PutDatabaseRecord tries to insert it via 
setObject(), it will throws exception ORA-01843:not a valid month.

I think this exception will be solved afte fixing this bug.

> PutDatabaseRecord does not properly handle BLOB/CLOB fields
> ---
>
> Key: NIFI-6061
> URL: https://issues.apache.org/jira/browse/NIFI-6061
> Project: Apache NiFi
>  Issue Type: Bug
>  Components: Extensions
>Reporter: Matt Burgess
>Priority: Major
>
> BLOB/CLOB fields in NiFi's Record API are returned from the record as 
> Object[Byte], but when PutDatabaseRecord tries to insert Object[] via 
> setObject(), the following error occurs:
> 2019-02-20 15:11:16,216 WARN [Timer-Driven Process Thread-10] 
> o.a.n.p.standard.PutDatabaseRecord 
> PutDatabaseRecord[id=0c84b9de-0169-1000-0164-3fbad7a17664] Failed to process 
> StandardFlowFileRecord[uuid=d739f432-0871-41bb-a0c9-d6ceeac68a6d,claim=StandardContentClaim
>  [resourceClaim=StandardResourceClaim[id=155069058-1, container=default, 
> section=1], offset=1728, 
> length=251],offset=0,name=d739f432-0871-41bb-a0c9-d6ceeac68a6d,size=251] due 
> to org.postgresql.util.PSQLException: Can't infer the SQL type to use for an 
> instance of [Ljava.lang.Object;. Use setObject() with an explicit Types value 
> to specify the type to use.: 
> Somewhere in the value conversion/representation, PutDatabaseRecord would 
> likely need to create a java.sql.Blob object and transfer the bytes into it. 
> One issue I see is that the record field type has been converted to 
> Array[Byte], so the information that the field is a BLOB is lost by that 
> point. If this requires DB-specific code, we'd likely need to add a Database 
> Adapter property and delegate out to the various DB adapters.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

2020-07-15 Thread GitBox


adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454824460



##
File path: libminifi/include/core/Repository.h
##
@@ -228,6 +232,8 @@ class Repository : public virtual 
core::SerializableComponent, public core::Trac
   Repository &operator=(const Repository &parent) = delete;
 
  protected:
+  std::map> containers;

Review comment:
   currently `Connectable` declares the `put` method, meaning that 
`Connectable`s are exactly what can be containers, later I would like if we had 
a `FlowFileContainer` class and separate this capability out





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

2020-07-15 Thread GitBox


adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454842291



##
File path: libminifi/include/core/FlowFile.h
##
@@ -35,9 +35,56 @@ namespace minifi {
 namespace core {
 
 class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+  class FlowFileOwnedResourceClaimPtr{
+   public:
+FlowFileOwnedResourceClaimPtr() = default;
+explicit FlowFileOwnedResourceClaimPtr(const 
std::shared_ptr& claim) : claim_(claim) {
+  if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+}
+explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr&& 
claim) : claim_(std::move(claim)) {
+  if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+}
+FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : 
claim_(ref.claim_) {
+  if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+}
+FlowFileOwnedResourceClaimPtr(FlowFileOwnedResourceClaimPtr&& ref) : 
claim_(std::move(ref.claim_)) {
+  // taking ownership of claim, no need to increment/decrement
+}
+FlowFileOwnedResourceClaimPtr& operator=(const 
FlowFileOwnedResourceClaimPtr& ref) = delete;
+FlowFileOwnedResourceClaimPtr& operator=(FlowFileOwnedResourceClaimPtr&& 
ref) = delete;
+
+FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const 
FlowFileOwnedResourceClaimPtr& ref) {
+  return set(owner, ref.claim_);
+}
+FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const 
std::shared_ptr& newClaim) {
+  auto oldClaim = claim_;
+  claim_ = newClaim;
+  // the order of increase/release is important
+  if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+  if (oldClaim) owner.releaseClaim(oldClaim);

Review comment:
   with refcount manipulation we always increment first, then decrement as 
this way we don't accidentally discard the object under ourselves, note that an 
equality check will not suffice as two `ResourceClaim` instances can reference 
the same file (they may have the same contentPath)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

2020-07-15 Thread GitBox


adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454839615



##
File path: libminifi/test/persistence-tests/PersistenceTests.cpp
##
@@ -0,0 +1,218 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
+#include 
+#include 
+#include 
+#include 
+
+#include "core/Core.h"
+#include "core/repository/AtomicRepoEntries.h"
+#include "core/RepositoryFactory.h"
+#include "FlowFileRecord.h"
+#include "FlowFileRepository.h"
+#include "properties/Configure.h"
+#include "../unit/ProvenanceTestHelper.h"
+#include "../TestBase.h"
+#include "../../extensions/libarchive/MergeContent.h"
+#include "../test/BufferReader.h"
+
+using Connection = minifi::Connection;
+using MergeContent = minifi::processors::MergeContent;
+
+struct TestFlow{
+  TestFlow(const std::shared_ptr& 
ff_repository, const std::shared_ptr& content_repo, 
const std::shared_ptr& prov_repo)
+  : ff_repository(ff_repository), content_repo(content_repo), 
prov_repo(prov_repo) {
+std::shared_ptr 
controller_services_provider = nullptr;
+
+// setup MERGE processor
+{
+  merge = std::make_shared("MergeContent", mergeProcUUID());
+  merge->initialize();
+  merge->setAutoTerminatedRelationships({{"original", "d"}});
+
+  merge->setProperty(MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+  merge->setProperty(MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
+  merge->setProperty(MergeContent::DelimiterStrategy, 
DELIMITER_STRATEGY_TEXT);
+  merge->setProperty(MergeContent::MinEntries, "3");
+  merge->setProperty(MergeContent::Header, "_Header_");
+  merge->setProperty(MergeContent::Footer, "_Footer_");
+  merge->setProperty(MergeContent::Demarcator, "_Demarcator_");
+  merge->setProperty(MergeContent::MaxBinAge, "1 h");
+
+  std::shared_ptr node = 
std::make_shared(merge);
+  mergeContext = std::make_shared(node, 
controller_services_provider, prov_repo, ff_repository, content_repo);
+}
+
+// setup INPUT processor
+{
+  inputProcessor = std::make_shared("source", 
inputProcUUID());
+  std::shared_ptr node = 
std::make_shared(inputProcessor);
+  inputContext = std::make_shared(node, 
controller_services_provider, prov_repo,
+ff_repository, 
content_repo);
+}
+
+// setup Input Connection
+{
+  input = std::make_shared(ff_repository, content_repo, 
"Input", inputConnUUID());
+  input->setRelationship({"input", "d"});
+  input->setDestinationUUID(mergeProcUUID());
+  input->setSourceUUID(inputProcUUID());
+  inputProcessor->addConnection(input);
+}
+
+// setup Output Connection
+{
+  output = std::make_shared(ff_repository, content_repo, 
"Output", outputConnUUID());
+  output->setRelationship(MergeContent::Merge);
+  output->setSourceUUID(mergeProcUUID());
+}
+
+// setup ProcessGroup
+{
+  root = 
std::make_shared(core::ProcessGroupType::ROOT_PROCESS_GROUP,
 "root");
+  root->addProcessor(merge);
+  root->addConnection(input);
+  root->addConnection(output);
+}
+
+// prepare Merge Processor for execution
+merge->setScheduledState(core::ScheduledState::RUNNING);
+merge->onSchedule(mergeContext.get(), new 
core::ProcessSessionFactory(mergeContext));
+  }
+  void write(const std::string& data) {
+minifi::io::DataStream stream(reinterpret_cast(data.c_str()), data.length());
+core::ProcessSession sessionGenFlowFile(inputContext);
+std::shared_ptr flow = 
std::static_pointer_cast(sessionGenFlowFile.create());
+sessionGenFlowFile.importFrom(stream, flow);
+sessionGenFlowFile.transfer(flow, {"input", "d"});
+sessionGenFlowFile.commit();

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

2020-07-15 Thread GitBox


adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454836324



##
File path: libminifi/src/core/ProcessSession.cpp
##
@@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source, 
std::vector buffer(getpagesize());
   try {
-try {
-  std::ifstream input{source, std::ios::in | std::ios::binary};
-  logger_->log_debug("Opening %s", source);
-  if (!input.is_open() || !input.good()) {
-throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: failed to open file \'", 
source, "\'"));
+std::ifstream input{source, std::ios::in | std::ios::binary};
+logger_->log_debug("Opening %s", source);
+if (!input.is_open() || !input.good()) {
+  throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: failed to open file \'", 
source, "\'"));
+}
+if (offset != 0U) {
+  input.seekg(offset, std::ifstream::beg);
+  if (!input.good()) {
+logger_->log_error("Seeking to %lu failed for file %s (does 
file/filesystem support seeking?)", offset, source);
+throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", 
std::to_string(offset)));
   }
-  if (offset != 0U) {
-input.seekg(offset, std::ifstream::beg);
-if (!input.good()) {
-  logger_->log_error("Seeking to %lu failed for file %s (does 
file/filesystem support seeking?)", offset, source);
-  throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", 
std::to_string(offset)));
-}
+}
+uint64_t startTime = 0U;
+while (input.good()) {
+  input.read(reinterpret_cast(buffer.data()), buffer.size());
+  std::streamsize read = input.gcount();
+  if (read < 0) {
+throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount 
returned negative value");
   }
-  uint64_t startTime = 0U;
-  while (input.good()) {
-input.read(reinterpret_cast(buffer.data()), buffer.size());
-std::streamsize read = input.gcount();
-if (read < 0) {
-  throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount 
returned negative value");
-}
-if (read == 0) {
-  logger_->log_trace("Finished reading input %s", source);
+  if (read == 0) {
+logger_->log_trace("Finished reading input %s", source);
+break;
+  } else {
+logging::LOG_TRACE(logger_) << "Read input of " << read;
+  }
+  uint8_t* begin = buffer.data();
+  uint8_t* end = begin + read;
+  while (true) {
+startTime = getTimeMillis();
+uint8_t* delimiterPos = std::find(begin, end, 
static_cast(inputDelimiter));
+const auto len = gsl::narrow(delimiterPos - begin);
+
+logging::LOG_TRACE(logger_) << "Read input of " << read << " length is 
" << len << " is at end?" << (delimiterPos == end);
+/*
+ * We do not want to process the rest of the buffer after the last 
delimiter if
+ *  - we have reached EOF in the file (we would discard it anyway)
+ *  - there is nothing to process (the last character in the buffer is 
a delimiter)
+ */
+if (delimiterPos == end && (input.eof() || len == 0)) {
   break;
-} else {
-  logging::LOG_TRACE(logger_) << "Read input of " << read;
 }
-uint8_t* begin = buffer.data();
-uint8_t* end = begin + read;
-while (true) {
-  startTime = getTimeMillis();
-  uint8_t* delimiterPos = std::find(begin, end, 
static_cast(inputDelimiter));
-  const auto len = gsl::narrow(delimiterPos - begin);
-
-  logging::LOG_TRACE(logger_) << "Read input of " << read << " length 
is " << len << " is at end?" << (delimiterPos == end);
-  /*
-   * We do not want to process the rest of the buffer after the last 
delimiter if
-   *  - we have reached EOF in the file (we would discard it anyway)
-   *  - there is nothing to process (the last character in the buffer 
is a delimiter)
-   */
-  if (delimiterPos == end && (input.eof() || len == 0)) {
-break;
-  }
-
-  /* Create claim and stream if needed and append data */
-  if (claim == nullptr) {
-startTime = getTimeMillis();
-claim = 
std::make_shared(process_context_->getContentRepository());
-  }
-  if (stream == nullptr) {
-stream = process_context_->getContentRepository()->write(claim);
-  }
-  if (stream == nullptr) {
-logger_->log_error("Stream is null");
-rollback();
-return;
-  }
-  if (stream->write(begin, len) != len) {
-logger_->log_error("Error while writ

[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

2020-07-15 Thread GitBox


adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454834083



##
File path: libminifi/src/FlowFileRecord.cpp
##
@@ -118,28 +110,27 @@ FlowFileRecord::~FlowFileRecord() {
 logger_->log_debug("Delete FlowFile UUID %s", uuidStr_);
   else
 logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuidStr_);
-  if (claim_) {
-releaseClaim(claim_);
-  } else {
+
+  if (!claim_) {
 logger_->log_debug("Claim is null ptr for %s", uuidStr_);
   }
 
+  claim_.set(*this, nullptr);
+
   // Disown stash claims
-  for (const auto &stashPair : stashedContent_) {
-releaseClaim(stashPair.second);
+  for (auto &stashPair : stashedContent_) {
+auto& stashClaim = stashPair.second;
+stashClaim.set(*this, nullptr);
   }
 }
 
 void FlowFileRecord::releaseClaim(std::shared_ptr claim) {
   // Decrease the flow file record owned count for the resource claim
-  claim_->decreaseFlowFileRecordOwnedCount();
-  std::string value;
-  logger_->log_debug("Delete Resource Claim %s, %s, attempt %llu", 
getUUIDStr(), claim_->getContentFullPath(), 
claim_->getFlowFileRecordOwnedCount());
-  if (claim_->getFlowFileRecordOwnedCount() <= 0) {
-// we cannot rely on the stored variable here since we aren't guaranteed 
atomicity
-if (flow_repository_ != nullptr && !flow_repository_->Get(uuidStr_, 
value)) {
-  logger_->log_debug("Delete Resource Claim %s", 
claim_->getContentFullPath());
-  content_repo_->remove(claim_);
+  claim->decreaseFlowFileRecordOwnedCount();
+  logger_->log_debug("Detaching Resource Claim %s, %s, attempt %llu", 
getUUIDStr(), claim->getContentFullPath(), 
claim->getFlowFileRecordOwnedCount());

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [nifi-minifi-cpp] adamdebreceni commented on a change in pull request #807: MINIFICPP-1228 - Resource ownership refactor + flowFile-owning processors.

2020-07-15 Thread GitBox


adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454833537



##
File path: libminifi/src/core/ProcessSession.cpp
##
@@ -248,35 +237,27 @@ void ProcessSession::penalize(const 
std::shared_ptr &flow) {
 void ProcessSession::transfer(const std::shared_ptr &flow, 
Relationship relationship) {
   logging::LOG_INFO(logger_) << "Transferring " << flow->getUUIDStr() << " 
from " << process_context_->getProcessorNode()->getName() << " to relationship 
" << relationship.getName();
   _transferRelationship[flow->getUUIDStr()] = relationship;
+  flow->setDeleted(false);

Review comment:
   during `commit` and `rollback` we check if the items in 
`_deletedFlowFiles` indeed stayed deleted, or a transfer or add marked them for 
"resurrection"

##
File path: libminifi/src/core/ProcessSession.cpp
##
@@ -248,35 +237,27 @@ void ProcessSession::penalize(const 
std::shared_ptr &flow) {
 void ProcessSession::transfer(const std::shared_ptr &flow, 
Relationship relationship) {
   logging::LOG_INFO(logger_) << "Transferring " << flow->getUUIDStr() << " 
from " << process_context_->getProcessorNode()->getName() << " to relationship 
" << relationship.getName();
   _transferRelationship[flow->getUUIDStr()] = relationship;
+  flow->setDeleted(false);

Review comment:
   during `commit` and `rollback` we check if the items in 
`_deletedFlowFiles` stayed deleted, or a transfer or add marked them for 
"resurrection"





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org