[jira] [Commented] (NIFI-10895) MiNiFi C2 - Implement UPDATE/PROPERTIES command
[ https://issues.apache.org/jira/browse/NIFI-10895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17677515#comment-17677515 ] ASF subversion and git services commented on NIFI-10895: Commit 2abb8921e7a990715f38d1615b930055747467ef in nifi's branch refs/heads/main from Ferenc Erdei [ https://gitbox.apache.org/repos/asf?p=nifi.git;h=2abb8921e7 ] NIFI-10895 Handle minifi restarts more gracefully Signed-off-by: Csaba Bejan This closes #6852. > MiNiFi C2 - Implement UPDATE/PROPERTIES command > --- > > Key: NIFI-10895 > URL: https://issues.apache.org/jira/browse/NIFI-10895 > Project: Apache NiFi > Issue Type: New Feature > Components: C2, MiNiFi >Reporter: Ferenc Erdei >Assignee: Ferenc Erdei >Priority: Minor > Time Spent: 3h 20m > Remaining Estimate: 0h > > Implement a UPDATE/PROPERTIES command which will allow the agent to update > bootstrap properties using the C2 protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [nifi] bejancsaba closed pull request #6852: NIFI-10895 Handle minifi restarts more gracefully
bejancsaba closed pull request #6852: NIFI-10895 Handle minifi restarts more gracefully URL: https://github.com/apache/nifi/pull/6852 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] ferencerdei opened a new pull request, #6852: NIFI-10895 Handle minifi restarts more gracefully
ferencerdei opened a new pull request, #6852: URL: https://github.com/apache/nifi/pull/6852 # Summary In some cases, MiNiFi process was not stopped correctly which prevented the bootstrap to restart it. [NIFI-10895](https://issues.apache.org/jira/browse/NIFI-10895) # Tracking Please complete the following tracking steps prior to pull request creation. ### Issue Tracking - [ ] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created ### Pull Request Tracking - [ ] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-0` - [ ] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-0` ### Pull Request Formatting - [ ] Pull Request based on current revision of the `main` branch - [ ] Pull Request refers to a feature branch with one commit containing changes # Verification Please indicate the verification steps performed prior to pull request creation. ### Build - [ ] Build completed using `mvn clean install -P contrib-check` - [ ] JDK 8 - [ ] JDK 11 - [ ] JDK 17 ### Licensing - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html) - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files ### Documentation - [ ] Documentation formatting appears as expected in rendered files -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (NIFI-11035) Replace remaining JUnit 4 assertions in nifi-commons with JUnit 5 assertions
[ https://issues.apache.org/jira/browse/NIFI-11035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17677493#comment-17677493 ] Daniel Stieglitz edited comment on NIFI-11035 at 1/16/23 7:04 PM: -- [~exceptionfactory] Thanks for the clarification. I have a specific question for conversion to JUnit 5 assertions. In nifi/nifi-commons/nifi-utils/src/test/groovy/org/apache/nifi/util/TestFormatUtilsGroovy.groovy in test testMakeWholeNumberTimeShouldHandleMetricConversions I tried replacing {code:java} results.every { String key, List values -> assert values.first() == SCENARIOS[key].expectedValue assert values.last() == SCENARIOS[key].expectedUnits } {code} with {code:java} results.entrySet().forEach(entry -> { assertEquals(SCENARIOS.get(entry.getKey()).expectedValue, entry.getValue().get(0)) assertEquals(SCENARIOS.get(entry.getKey()).expectedUnits, entry.getValue().get(1)) }) {code} When I do that I see the unit test fails with {code:java} junit.framework.AssertionFailedError: expected:<75> but was:<750> Expected :75 Actual :750 {code} When I added a log statement within the original Groovy code (in the closure) I noticed "every" does not iterate through all the results only the first one which is not what "every" is supposed to do. So that explains why the unit passed but it seems it really should not. Do you have any insight as to why I would see this behavior? was (Author: JIRAUSER294662): [~exceptionfactory] Thanks for the clarification. I have a specific question for conversion to JUnit 5 assertions. In nifi/nifi-commons/nifi-utils/src/test/groovy/org/apache/nifi/util/TestFormatUtilsGroovy.groovy in test testMakeWholeNumberTimeShouldHandleMetricConversions I tried replacing {code:java} results.every { String key, List values -> assert values.first() == SCENARIOS[key].expectedValue assert values.last() == SCENARIOS[key].expectedUnits } {code} with {code:java} results.entrySet().forEach(entry -> { assertEquals(SCENARIOS.get(entry.getKey()).expectedValue, entry.getValue().get(0)) assertEquals(SCENARIOS.get(entry.getKey()).expectedUnits, entry.getValue().get(1)) }) {code} When I do that I see the unit test fails with {code:java} junit.framework.AssertionFailedError: expected:<75> but was:<750> Expected :75 Actual :750 {code} When I added a log statement within the original Groovy code (in the closure) I noticed "every" does not iterate through all the results only the first one which is not what "every" is supposed to do. Do you have any insight as to why I would see this behavior? > Replace remaining JUnit 4 assertions in nifi-commons with JUnit 5 assertions > > > Key: NIFI-11035 > URL: https://issues.apache.org/jira/browse/NIFI-11035 > Project: Apache NiFi > Issue Type: Sub-task >Reporter: Daniel Stieglitz >Assignee: Daniel Stieglitz >Priority: Minor > > A quick search nifi-commons for "org.junit.Assert" indicates there are over > 50 cases where JUnit 4 assertions are still being used. These should be > replaced with JUnit 5 assertions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (NIFI-11035) Replace remaining JUnit 4 assertions in nifi-commons with JUnit 5 assertions
[ https://issues.apache.org/jira/browse/NIFI-11035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17677493#comment-17677493 ] Daniel Stieglitz commented on NIFI-11035: - [~exceptionfactory] Thanks for the clarification. I have a specific question for conversion to JUnit 5 assertions. In nifi/nifi-commons/nifi-utils/src/test/groovy/org/apache/nifi/util/TestFormatUtilsGroovy.groovy in test testMakeWholeNumberTimeShouldHandleMetricConversions I tried replacing {code:java} results.every { String key, List values -> assert values.first() == SCENARIOS[key].expectedValue assert values.last() == SCENARIOS[key].expectedUnits } {code} with {code:java} results.entrySet().forEach(entry -> { assertEquals(SCENARIOS.get(entry.getKey()).expectedValue, entry.getValue().get(0)) assertEquals(SCENARIOS.get(entry.getKey()).expectedUnits, entry.getValue().get(1)) }) {code} When I do that I see the unit test fails with {code:java} junit.framework.AssertionFailedError: expected:<75> but was:<750> Expected :75 Actual :750 {code} When I through a log statement within the original Groovy code (in the closure) I noticed "every" does not iterate through all the results only the first one which is not what "every" is supposed to do. Do you have any insight as to why I would see this behavior? > Replace remaining JUnit 4 assertions in nifi-commons with JUnit 5 assertions > > > Key: NIFI-11035 > URL: https://issues.apache.org/jira/browse/NIFI-11035 > Project: Apache NiFi > Issue Type: Sub-task >Reporter: Daniel Stieglitz >Assignee: Daniel Stieglitz >Priority: Minor > > A quick search nifi-commons for "org.junit.Assert" indicates there are over > 50 cases where JUnit 4 assertions are still being used. These should be > replaced with JUnit 5 assertions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (NIFI-11035) Replace remaining JUnit 4 assertions in nifi-commons with JUnit 5 assertions
[ https://issues.apache.org/jira/browse/NIFI-11035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17677493#comment-17677493 ] Daniel Stieglitz edited comment on NIFI-11035 at 1/16/23 7:03 PM: -- [~exceptionfactory] Thanks for the clarification. I have a specific question for conversion to JUnit 5 assertions. In nifi/nifi-commons/nifi-utils/src/test/groovy/org/apache/nifi/util/TestFormatUtilsGroovy.groovy in test testMakeWholeNumberTimeShouldHandleMetricConversions I tried replacing {code:java} results.every { String key, List values -> assert values.first() == SCENARIOS[key].expectedValue assert values.last() == SCENARIOS[key].expectedUnits } {code} with {code:java} results.entrySet().forEach(entry -> { assertEquals(SCENARIOS.get(entry.getKey()).expectedValue, entry.getValue().get(0)) assertEquals(SCENARIOS.get(entry.getKey()).expectedUnits, entry.getValue().get(1)) }) {code} When I do that I see the unit test fails with {code:java} junit.framework.AssertionFailedError: expected:<75> but was:<750> Expected :75 Actual :750 {code} When I added a log statement within the original Groovy code (in the closure) I noticed "every" does not iterate through all the results only the first one which is not what "every" is supposed to do. Do you have any insight as to why I would see this behavior? was (Author: JIRAUSER294662): [~exceptionfactory] Thanks for the clarification. I have a specific question for conversion to JUnit 5 assertions. In nifi/nifi-commons/nifi-utils/src/test/groovy/org/apache/nifi/util/TestFormatUtilsGroovy.groovy in test testMakeWholeNumberTimeShouldHandleMetricConversions I tried replacing {code:java} results.every { String key, List values -> assert values.first() == SCENARIOS[key].expectedValue assert values.last() == SCENARIOS[key].expectedUnits } {code} with {code:java} results.entrySet().forEach(entry -> { assertEquals(SCENARIOS.get(entry.getKey()).expectedValue, entry.getValue().get(0)) assertEquals(SCENARIOS.get(entry.getKey()).expectedUnits, entry.getValue().get(1)) }) {code} When I do that I see the unit test fails with {code:java} junit.framework.AssertionFailedError: expected:<75> but was:<750> Expected :75 Actual :750 {code} When I through a log statement within the original Groovy code (in the closure) I noticed "every" does not iterate through all the results only the first one which is not what "every" is supposed to do. Do you have any insight as to why I would see this behavior? > Replace remaining JUnit 4 assertions in nifi-commons with JUnit 5 assertions > > > Key: NIFI-11035 > URL: https://issues.apache.org/jira/browse/NIFI-11035 > Project: Apache NiFi > Issue Type: Sub-task >Reporter: Daniel Stieglitz >Assignee: Daniel Stieglitz >Priority: Minor > > A quick search nifi-commons for "org.junit.Assert" indicates there are over > 50 cases where JUnit 4 assertions are still being used. These should be > replaced with JUnit 5 assertions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [nifi] nandorsoma commented on a diff in pull request #6832: NIFI-10965 PutGoogleDrive
nandorsoma commented on code in PR #6832: URL: https://github.com/apache/nifi/pull/6832#discussion_r1071311474 ## nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/FetchGoogleDrive.java: ## @@ -125,9 +145,17 @@ public void onTrigger(ProcessContext context, ProcessSession session) throws Pro String fileId = context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue(); FlowFile outFlowFile = flowFile; +final long startNanos = System.nanoTime(); try { outFlowFile = fetchFile(fileId, session, outFlowFile); +File fileMetadata = fetchFileMetadata(fileId); Review Comment: Minor, but missing final. Also in line 145. I know this part is not modified in this PR, but do we know why do we have an outFlowFile alias? ## nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/GoogleDriveAttributes.java: ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.drive; + +public class GoogleDriveAttributes { + +public static final String ID = "drive.id"; +public static final String ID_DESC = "The id of the file"; + +public static final String FILENAME = "filename"; Review Comment: I know this part is refactored and not new, but is the "drive" prefix missing on purpose? ## nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/drive/PutGoogleDrive.java: ## @@ -0,0 +1,495 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.gcp.drive; + +/* + * This processor uploads objects to Google Drive. + */ + +import static java.lang.String.format; +import static java.lang.String.valueOf; +import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static java.util.stream.Collectors.joining; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_CODE_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ERROR_MESSAGE_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.FILENAME_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.ID_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.MIME_TYPE_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.SIZE_DESC; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP; +import static org.apache.nifi.processors.gcp.drive.GoogleDriveAttributes.TIMESTAMP_DESC; +import static org.apache.nifi.processors.gcp.util.GoogleUtils.GCP_CREDENTIALS_PROVIDER_SERVICE; + +import com.google.api.client.googleapis.json.GoogleJsonRespo
[GitHub] [nifi-minifi-cpp] lordgamez opened a new pull request, #1493: MINIFICPP-2004 Remove default property values from kubernetes examples
lordgamez opened a new pull request, #1493: URL: https://github.com/apache/nifi-minifi-cpp/pull/1493 https://issues.apache.org/jira/browse/MINIFICPP-2004 Thank you for submitting a contribution to Apache NiFi - MiNiFi C++. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with MINIFICPP- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically main)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file? - [ ] If applicable, have you updated the NOTICE file? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check GitHub Actions CI results for build issues and submit an update to your PR as soon as possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1456: MINIFICPP-1965 Add CMAKE flags to select malloc implementation
szaszm commented on code in PR #1456: URL: https://github.com/apache/nifi-minifi-cpp/pull/1456#discussion_r1071415382 ## docker/test/integration/MiNiFi_integration_test_driver.py: ## @@ -311,3 +311,15 @@ def check_metric_class_on_prometheus(self, metric_class, timeout_seconds): def check_processor_metric_on_prometheus(self, metric_class, timeout_seconds, processor_name): assert self.cluster.wait_for_processor_metric_on_prometheus(metric_class, timeout_seconds, processor_name) + +def check_minimum_peak_memory_usage(self, minimum_peak_memory_usage: int, timeout_seconds: int) -> None: +assert self.cluster.wait_for_peak_memory_usage_to_exceed(minimum_peak_memory_usage, timeout_seconds) + +def check_maximum_memory_usage(self, maximum_memory_usage: int, timeout_seconds: int) -> None: +assert self.cluster.wait_for_memory_usage_to_drop_below(maximum_memory_usage, timeout_seconds) Review Comment: I think the rename would make sense here as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (MINIFICPP-2027) Upgrade google cloud library to latest version
[ https://issues.apache.org/jira/browse/MINIFICPP-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gábor Gyimesi reassigned MINIFICPP-2027: Assignee: (was: Gábor Gyimesi) > Upgrade google cloud library to latest version > -- > > Key: MINIFICPP-2027 > URL: https://issues.apache.org/jira/browse/MINIFICPP-2027 > Project: Apache NiFi MiNiFi C++ > Issue Type: Improvement >Reporter: Gábor Gyimesi >Priority: Minor > > Google cloud cpp library has had a major version change since we introduced > it in our project. It also had a major breaking change in the storage mocking > library which would simplify our tests, but would require some effort on our > side: [https://github.com/googleapis/google-cloud-cpp/pull/8806/files] > -The upgrade may also fix MSVC compile warnings: > [https://github.com/apache/nifi-minifi-cpp/actions/runs/3856956550/jobs/6573795651] > We should check if the upgrade solves the issue otherwise we should look for > another solution.- -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (MINIFICPP-2027) Upgrade google cloud library to latest version
[ https://issues.apache.org/jira/browse/MINIFICPP-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gábor Gyimesi updated MINIFICPP-2027: - Description: Google cloud cpp library has had a major version change since we introduced it in our project. It also had a major breaking change in the storage mocking library which would simplify our tests, but would require some effort on our side: [https://github.com/googleapis/google-cloud-cpp/pull/8806/files] -The upgrade may also fix MSVC compile warnings: [https://github.com/apache/nifi-minifi-cpp/actions/runs/3856956550/jobs/6573795651] We should check if the upgrade solves the issue otherwise we should look for another solution.- was: Google cloud cpp library has had a major version change since we introduced it in our project. It also had a major breaking change in the storage mocking library which would simplify our tests, but would require some effort on our side: [https://github.com/googleapis/google-cloud-cpp/pull/8806/files] The upgrade may also fix MSVC compile warnings: [https://github.com/apache/nifi-minifi-cpp/actions/runs/3856956550/jobs/6573795651] We should check if the upgrade solves the issue otherwise we should look for another solution. > Upgrade google cloud library to latest version > -- > > Key: MINIFICPP-2027 > URL: https://issues.apache.org/jira/browse/MINIFICPP-2027 > Project: Apache NiFi MiNiFi C++ > Issue Type: Improvement >Reporter: Gábor Gyimesi >Assignee: Gábor Gyimesi >Priority: Minor > > Google cloud cpp library has had a major version change since we introduced > it in our project. It also had a major breaking change in the storage mocking > library which would simplify our tests, but would require some effort on our > side: [https://github.com/googleapis/google-cloud-cpp/pull/8806/files] > -The upgrade may also fix MSVC compile warnings: > [https://github.com/apache/nifi-minifi-cpp/actions/runs/3856956550/jobs/6573795651] > We should check if the upgrade solves the issue otherwise we should look for > another solution.- -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (NIFI-11049) PutDropboxTest is an unreliable test
[ https://issues.apache.org/jira/browse/NIFI-11049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter Turcsanyi resolved NIFI-11049. Fix Version/s: 1.20.0 Resolution: Fixed > PutDropboxTest is an unreliable test > > > Key: NIFI-11049 > URL: https://issues.apache.org/jira/browse/NIFI-11049 > Project: Apache NiFi > Issue Type: Bug > Environment: Apache Maven 3.8.7 > (b89d5959fcde851dcb1c8946a785a163f14e1e29) > Maven home: /development/tools/apache-maven-3.8.7 > Java version: 1.8.0_352, vendor: Azul Systems, Inc., runtime: > /usr/lib/jvm/zulu8.66.0.15-ca-jdk8.0.352-linux_x64/jre > Default locale: en_US, platform encoding: UTF-8 > OS name: "linux", version: "6.0.18-300.fc37.x86_64", arch: "amd64", family: > "unix" >Reporter: Joe Witt >Assignee: Zsihovszki Krisztina >Priority: Major > Fix For: 1.20.0 > > Time Spent: 50m > Remaining Estimate: 0h > > [ERROR] Tests run: 11, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: > 1.024 s <<< FAILURE! - in org.apache.nifi.processors.dropbox.PutDropboxTest > [ERROR] > org.apache.nifi.processors.dropbox.PutDropboxTest.testFileUploadLargeFile > Time elapsed: 0.132 s <<< FAILURE! > org.opentest4j.AssertionFailedError: Expected all Transferred FlowFiles to go > to success but 1 were routed to failure > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:135) > at > org.apache.nifi.util.MockProcessSession.assertAllFlowFilesTransferred(MockProcessSession.java:1294) > at > org.apache.nifi.util.StandardProcessorTestRunner.assertAllFlowFilesTransferred(StandardProcessorTestRunner.java:310) > at > org.apache.nifi.util.StandardProcessorTestRunner.assertAllFlowFilesTransferred(StandardProcessorTestRunner.java:389) > at > org.apache.nifi.processors.dropbox.PutDropboxTest.testFileUploadLargeFile(PutDropboxTest.java:274) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at > org.junit.platform.engine.support.hierarchical.Throwa
[jira] [Commented] (NIFI-11049) PutDropboxTest is an unreliable test
[ https://issues.apache.org/jira/browse/NIFI-11049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17677450#comment-17677450 ] ASF subversion and git services commented on NIFI-11049: Commit b7071cbb04c54340cd39372e8a024542a1f6974a in nifi's branch refs/heads/main from krisztina-zsihovszki [ https://gitbox.apache.org/repos/asf?p=nifi.git;h=b7071cbb04 ] NIFI-11049 Fixed flaky PutDropboxTest, attribute description corrected This closes #6850. Signed-off-by: Peter Turcsanyi > PutDropboxTest is an unreliable test > > > Key: NIFI-11049 > URL: https://issues.apache.org/jira/browse/NIFI-11049 > Project: Apache NiFi > Issue Type: Bug > Environment: Apache Maven 3.8.7 > (b89d5959fcde851dcb1c8946a785a163f14e1e29) > Maven home: /development/tools/apache-maven-3.8.7 > Java version: 1.8.0_352, vendor: Azul Systems, Inc., runtime: > /usr/lib/jvm/zulu8.66.0.15-ca-jdk8.0.352-linux_x64/jre > Default locale: en_US, platform encoding: UTF-8 > OS name: "linux", version: "6.0.18-300.fc37.x86_64", arch: "amd64", family: > "unix" >Reporter: Joe Witt >Assignee: Zsihovszki Krisztina >Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > [ERROR] Tests run: 11, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: > 1.024 s <<< FAILURE! - in org.apache.nifi.processors.dropbox.PutDropboxTest > [ERROR] > org.apache.nifi.processors.dropbox.PutDropboxTest.testFileUploadLargeFile > Time elapsed: 0.132 s <<< FAILURE! > org.opentest4j.AssertionFailedError: Expected all Transferred FlowFiles to go > to success but 1 were routed to failure > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:135) > at > org.apache.nifi.util.MockProcessSession.assertAllFlowFilesTransferred(MockProcessSession.java:1294) > at > org.apache.nifi.util.StandardProcessorTestRunner.assertAllFlowFilesTransferred(StandardProcessorTestRunner.java:310) > at > org.apache.nifi.util.StandardProcessorTestRunner.assertAllFlowFilesTransferred(StandardProcessorTestRunner.java:389) > at > org.apache.nifi.processors.dropbox.PutDropboxTest.testFileUploadLargeFile(PutDropboxTest.java:274) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138) >
[GitHub] [nifi] asfgit closed pull request #6850: NIFI-11049 Fixed flaky PutDropboxTest, attribute description corrected
asfgit closed pull request #6850: NIFI-11049 Fixed flaky PutDropboxTest, attribute description corrected URL: https://github.com/apache/nifi/pull/6850 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (MINIFICPP-2004) Remove default property values from kubernetes examples
[ https://issues.apache.org/jira/browse/MINIFICPP-2004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gábor Gyimesi reassigned MINIFICPP-2004: Assignee: Gábor Gyimesi > Remove default property values from kubernetes examples > --- > > Key: MINIFICPP-2004 > URL: https://issues.apache.org/jira/browse/MINIFICPP-2004 > Project: Apache NiFi MiNiFi C++ > Issue Type: Improvement >Reporter: Gábor Gyimesi >Assignee: Gábor Gyimesi >Priority: Trivial > > Update kubernetes example yaml files with [~szaszm]'s suggestion from > [https://github.com/apache/nifi-minifi-cpp/pull/1460] : > "I think the property is not relevant here, so I would leave it out from the > example. I would ideally also remove all other properties that are using > their default values, to keep the example minimal" -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [nifi] turcsanyip commented on pull request #6850: NIFI-11049 Fixed flaky PutDropboxTest, attribute description corrected
turcsanyip commented on PR #6850: URL: https://github.com/apache/nifi/pull/6850#issuecomment-1384223327 @nandorsoma Thanks for your review! Merging to main... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6846: NIFI-10991 Add AWS MSK IAM support to Kafka processors
exceptionfactory commented on code in PR #6846: URL: https://github.com/apache/nifi/pull/6846#discussion_r1071371445 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java: ## @@ -107,6 +107,35 @@ public interface KafkaClientComponent { ) .build(); +PropertyDescriptor AWS_PROFILE_NAME = new PropertyDescriptor.Builder() +.name("aws.profile.name") +.displayName("AWS Profile Name") +.description("The AWS Profile to consider when there are multiple profiles available.") +.dependsOn( +SASL_MECHANISM, +SaslMechanism.AWS_MSK_IAM +) +.required(false) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); + +PropertyDescriptor AWS_DEBUG_CREDS = new PropertyDescriptor.Builder() +.name("aws.debug.creds") +.displayName("Debug AWS Credentials") +.description("This property helps to debug which AWS credential is being exactly used. If this property is set to true " ++ "and `software.amazon.msk.auth.iam.internals.MSKCredentialProvider` logger is set to DEBUG, a log will be printed " ++ "including IAM Account, IAM user id and the ARN of the IAM Principal corresponding to the credential being used. " ++ "It is recommended to use this property only during debug since it makes an additional remote call.") Review Comment: One potential option could be to pass the debug status for the associated Processor's ComponentLog, although that would require some additional wiring. Either way, I agree that this is not ideal as a Processor property. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] exceptionfactory commented on a diff in pull request #6846: NIFI-10991 Add AWS MSK IAM support to Kafka processors
exceptionfactory commented on code in PR #6846: URL: https://github.com/apache/nifi/pull/6846#discussion_r1071357824 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java: ## @@ -107,6 +107,35 @@ public interface KafkaClientComponent { ) .build(); +PropertyDescriptor AWS_PROFILE_NAME = new PropertyDescriptor.Builder() +.name("aws.profile.name") +.displayName("AWS Profile Name") +.description("The AWS Profile to consider when there are multiple profiles available.") Review Comment: ```suggestion .description("The Amazon Web Services Profile to select when multiple profiles are available.") ``` ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java: ## @@ -107,6 +107,35 @@ public interface KafkaClientComponent { ) .build(); +PropertyDescriptor AWS_PROFILE_NAME = new PropertyDescriptor.Builder() +.name("aws.profile.name") +.displayName("AWS Profile Name") +.description("The AWS Profile to consider when there are multiple profiles available.") +.dependsOn( +SASL_MECHANISM, +SaslMechanism.AWS_MSK_IAM +) +.required(false) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); + +PropertyDescriptor AWS_DEBUG_CREDS = new PropertyDescriptor.Builder() Review Comment: Recommend removing this property descriptor and associated references since it is not necessary or recommend for standard operations. ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/property/provider/StandardKafkaPropertyProvider.java: ## @@ -160,9 +165,17 @@ private void setProperty(final Map properties, final String prop } } -private boolean isCustomKerberosLoginFound() { +private static boolean isCustomKerberosLoginFound() { +return isClassFound(SASL_GSSAPI_CUSTOM_LOGIN_CLASS); +} + +public static boolean isIAMCallbackHandlerFound() { +return isClassFound(SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS); +} + +private static boolean isClassFound(String clazz) { Review Comment: ```suggestion private static boolean isClassFound(final String className) { ``` ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java: ## @@ -107,6 +107,35 @@ public interface KafkaClientComponent { ) .build(); +PropertyDescriptor AWS_PROFILE_NAME = new PropertyDescriptor.Builder() +.name("aws.profile.name") +.displayName("AWS Profile Name") +.description("The AWS Profile to consider when there are multiple profiles available.") +.dependsOn( +SASL_MECHANISM, +SaslMechanism.AWS_MSK_IAM +) +.required(false) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) Review Comment: Recommend removing support for Variable Registry expressions because this feature is subject to removal and Parameters should be used instead. ```suggestion ``` ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/validation/KafkaClientCustomValidationFunction.java: ## @@ -233,6 +235,19 @@ private void validateUsernamePassword(final ValidationContext validationContext, } } +private void validateAWSIAMMechanism(final ValidationContext validationContext, final Collection results) { +final String saslMechanism = validationContext.getProperty(SASL_MECHANISM).getValue(); + +if (SaslMechanism.AWS_MSK_IAM.getValue().equals(saslMechanism) && !StandardKafkaPropertyProvider.isIAMCallbackHandlerFound()) { +final String explanation = String.format("[%s] should be on classpath", StandardKafkaPropertyProvider.SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS); Review Comment: ```suggestion final String explanation = String.format("[%s] required class not found: Kafka modules must be compiled with AWS MSK enabled", StandardKafkaPropertyProvider.SASL_AWS_MSK_IAM_CLIENT_CALLBACK_HANDLER_CLASS); ``` ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/login/IAMLoginConfigProvider.java: ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + *
[jira] [Updated] (NIFI-11059) Add PutBoxFile processor
[ https://issues.apache.org/jira/browse/NIFI-11059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zsihovszki Krisztina updated NIFI-11059: Summary: Add PutBoxFile processor (was: Add PutBox processor) > Add PutBoxFile processor > > > Key: NIFI-11059 > URL: https://issues.apache.org/jira/browse/NIFI-11059 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Zsihovszki Krisztina >Assignee: Zsihovszki Krisztina >Priority: Major > > Add a processor that is able to upload files to a Box folder. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [nifi] turcsanyip commented on a diff in pull request #6846: NIFI-10991 Add AWS MSK IAM support to Kafka processors
turcsanyip commented on code in PR #6846: URL: https://github.com/apache/nifi/pull/6846#discussion_r1071346246 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-shared/src/main/java/org/apache/nifi/kafka/shared/component/KafkaClientComponent.java: ## @@ -107,6 +107,35 @@ public interface KafkaClientComponent { ) .build(); +PropertyDescriptor AWS_PROFILE_NAME = new PropertyDescriptor.Builder() +.name("aws.profile.name") +.displayName("AWS Profile Name") +.description("The AWS Profile to consider when there are multiple profiles available.") +.dependsOn( +SASL_MECHANISM, +SaslMechanism.AWS_MSK_IAM +) +.required(false) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); + +PropertyDescriptor AWS_DEBUG_CREDS = new PropertyDescriptor.Builder() +.name("aws.debug.creds") +.displayName("Debug AWS Credentials") +.description("This property helps to debug which AWS credential is being exactly used. If this property is set to true " ++ "and `software.amazon.msk.auth.iam.internals.MSKCredentialProvider` logger is set to DEBUG, a log will be printed " ++ "including IAM Account, IAM user id and the ARN of the IAM Principal corresponding to the credential being used. " ++ "It is recommended to use this property only during debug since it makes an additional remote call.") Review Comment: @nandorsoma Thanks for adding this new feature! I had just a look at the code and would like to add a note here: I would avoid adding "debug" properties to processors because they are related to troubleshooting, not to the business functionality. Would not it be possible to make it configurable in some other way? (e.g. java system property documented on the Additional Details page) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
fgerlits commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1071320280 ## extensions/mqtt/processors/AbstractMQTTProcessor.cpp: ## @@ -137,49 +148,317 @@ void AbstractMQTTProcessor::onSchedule(const std::shared_ptrlog_error("MQTT client is not existing while trying to reconnect"); +throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT client is not existing while trying to reconnect"); + } + if (MQTTAsync_isConnected(client_)) { +logger_->log_debug("Already connected to %s, no need to reconnect", uri_); return; } + + MQTTAsync_connectOptions connect_options; + MQTTProperties connect_properties = MQTTProperties_initializer; + MQTTProperties will_properties = MQTTProperties_initializer; + + ConnectFinishedTask connect_finished_task( + [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5) { +onConnectFinished(success_data, success_data_5, failure_data, failure_data_5); + }); + + setConnectOptions(connect_options, connect_properties, will_properties, connect_finished_task); + + logger_->log_info("Reconnecting to %s", uri_); if (MQTTAsync_isConnected(client_)) { -logger_->log_info("Already connected to %s, no need to reconnect", uri_); +logger_->log_debug("Already connected to %s, no need to reconnect", uri_); +return; + } + const int ret = MQTTAsync_connect(client_, &connect_options); + MQTTProperties_free(&connect_properties); + if (ret != MQTTASYNC_SUCCESS) { +logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error code [%d]", uri_, ret); return; } - MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; - conn_opts.keepAliveInterval = gsl::narrow(keep_alive_interval_.count()); - conn_opts.cleansession = getCleanSession(); - conn_opts.context = this; - conn_opts.onSuccess = connectionSuccess; - conn_opts.onFailure = connectionFailure; - conn_opts.connectTimeout = gsl::narrow(connection_timeout_.count()); + + // wait until connection succeeds or fails + connect_finished_task.get_future().get(); +} + +void AbstractMQTTProcessor::setConnectOptions(MQTTAsync_connectOptions& connect_options, MQTTProperties& connect_properties, + MQTTProperties& will_properties, const ConnectFinishedTask& connect_finished_task) const { + if (mqtt_version_.value() == MqttVersions::V_5_0) { +setMqtt5ConnectOptions(connect_options, connect_properties, will_properties); + } else { +setMqtt3ConnectOptions(connect_options); + } + + connect_options.context = const_cast(&connect_finished_task); + connect_options.connectTimeout = gsl::narrow(connection_timeout_.count()); + connect_options.keepAliveInterval = gsl::narrow(keep_alive_interval_.count()); if (!username_.empty()) { -conn_opts.username = username_.c_str(); -conn_opts.password = password_.c_str(); +connect_options.username = username_.c_str(); +connect_options.password = password_.c_str(); } if (sslOpts_) { -conn_opts.ssl = &*sslOpts_; +connect_options.ssl = const_cast(&*sslOpts_); } if (last_will_) { -conn_opts.will = &*last_will_; +connect_options.will = const_cast(&*last_will_); } +} - logger_->log_info("Reconnecting to %s", uri_); - int ret = MQTTAsync_connect(client_, &conn_opts); - if (ret != MQTTASYNC_SUCCESS) { -logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, ret); +void AbstractMQTTProcessor::setMqtt3ConnectOptions(MQTTAsync_connectOptions& connect_options) const { + connect_options = MQTTAsync_connectOptions_initializer; + connect_options.onSuccess = connectionSuccess; + connect_options.onFailure = connectionFailure; + connect_options.cleansession = getCleanSession(); + + if (mqtt_version_.value() == MqttVersions::V_3_1_0) { +connect_options.MQTTVersion = MQTTVERSION_3_1; + } else if (mqtt_version_.value() == MqttVersions::V_3_1_1) { +connect_options.MQTTVersion = MQTTVERSION_3_1_1; + } +} + +void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& connect_options, MQTTProperties& connect_properties, MQTTProperties& will_properties) const { + connect_options = MQTTAsync_connectOptions_initializer5; + connect_options.onSuccess5 = connectionSuccess5; + connect_options.onFailure5 = connectionFailure5; + connect_options.connectProperties = &connect_properties; + + connect_options.cleanstart = getCleanStart(); + + { +MQTTProperty property; +property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL; +property.value.integer4 = gsl::narrow(getSessionExpiryInterval().count()); +MQTTProperties_add(&connect_properties, &property); + } + + if (!last_will_content_type_.empty()) { +MQTTProperty property; +property.identifier = MQTTPRO
[jira] [Created] (MINIFICPP-2033) Check if we need runDurationNanos
Adam Debreceni created MINIFICPP-2033: - Summary: Check if we need runDurationNanos Key: MINIFICPP-2033 URL: https://issues.apache.org/jira/browse/MINIFICPP-2033 Project: Apache NiFi MiNiFi C++ Issue Type: Improvement Reporter: Adam Debreceni Currently we can set the runDurationNanos but it is unused, investigate if we need it or it can be removed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (NIFI-11059) Add PutBox processor
[ https://issues.apache.org/jira/browse/NIFI-11059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zsihovszki Krisztina reassigned NIFI-11059: --- Assignee: Zsihovszki Krisztina > Add PutBox processor > > > Key: NIFI-11059 > URL: https://issues.apache.org/jira/browse/NIFI-11059 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Zsihovszki Krisztina >Assignee: Zsihovszki Krisztina >Priority: Major > > Add a processor that is able to upload files to a Box folder. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (NIFI-11059) Add PutBox processor
Zsihovszki Krisztina created NIFI-11059: --- Summary: Add PutBox processor Key: NIFI-11059 URL: https://issues.apache.org/jira/browse/NIFI-11059 Project: Apache NiFi Issue Type: New Feature Components: Extensions Reporter: Zsihovszki Krisztina Add a processor that is able to upload files to a Box folder. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [nifi] nandorsoma commented on a diff in pull request #6850: NIFI-11049 Fixed flaky PutDropboxTest, attribute description corrected
nandorsoma commented on code in PR #6850: URL: https://github.com/apache/nifi/pull/6850#discussion_r1071262871 ## nifi-nar-bundles/nifi-dropbox-bundle/nifi-dropbox-processors/src/test/java/org/apache/nifi/processors/dropbox/PutDropboxTest.java: ## @@ -255,18 +258,18 @@ void testFileUploadLargeFile() throws Exception { .thenReturn(mockUploadSessionAppendV2Uploader); //finish session: 30 - 8 - 2 * 8 = 6 bytes uploaded -CommitInfo commitInfo = CommitInfo.newBuilder(getPath(TEST_FOLDER , FILENAME_1)) +CommitInfo commitInfo = CommitInfo.newBuilder(getPath(TEST_FOLDER, FILENAME_1)) Review Comment: I don't feel strongly, but I wouldn't recreate this object solely to have a prototype that can be used as a matcher. Plain any() matcher would be enough for me, but if we wanted to match its properties, I would create a Factory, especially for CommitInfo, which we can mock adequately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] fgerlits commented on a diff in pull request #1457: MINIFICPP-1979 Use Coroutines with asio
fgerlits commented on code in PR #1457: URL: https://github.com/apache/nifi-minifi-cpp/pull/1457#discussion_r1067152515 ## extensions/standard-processors/processors/NetworkListenerProcessor.cpp: ## @@ -66,16 +66,16 @@ void NetworkListenerProcessor::startTcpServer(const core::ProcessContext& contex auto options = readServerOptions(context); std::string ssl_value; + std::optional ssl_options; if (context.getProperty(ssl_context_property.getName(), ssl_value) && !ssl_value.empty()) { auto ssl_data = utils::net::getSslData(context, ssl_context_property, logger_); if (!ssl_data || !ssl_data->isValid()) { throw Exception(PROCESSOR_EXCEPTION, "SSL Context Service is set, but no valid SSL data was found!"); } -auto client_auth = utils::parseEnumProperty(context, client_auth_property); -server_ = std::make_unique(options.max_queue_size, options.port, logger_, *ssl_data, client_auth); - } else { -server_ = std::make_unique(options.max_queue_size, options.port, logger_); +auto client_auth = utils::parseEnumProperty(context, client_auth_property); +ssl_options.emplace(utils::net::SslServerOptions{std::move(*ssl_data), client_auth}); Review Comment: minor, but this constructs `SslServerOptions` first, then moves it; you can get rid of the move by changing it to ```suggestion ssl_options.emplace(std::move(*ssl_data), client_auth); ``` ## extensions/standard-processors/tests/unit/ListenSyslogTests.cpp: ## @@ -249,132 +248,155 @@ void check_parsed_attributes(const core::FlowFile& flow_file, const ValidRFC3164 CHECK(original_message.msg_ == flow_file.getAttribute("syslog.msg")); } -TEST_CASE("ListenSyslog without parsing test", "[ListenSyslog][NetworkListenerProcessor]") { +uint16_t schedule_on_random_port(SingleProcessorTestController& controller, const std::shared_ptr& listen_syslog) { + REQUIRE(listen_syslog->setProperty(ListenSyslog::Port, "0")); + controller.plan->scheduleProcessor(listen_syslog); + uint16_t port = listen_syslog->getPort(); + auto deadline = std::chrono::steady_clock::now() + 200ms; + while (port == 0 && deadline > std::chrono::steady_clock::now()) { +std::this_thread::sleep_for(20ms); +port = listen_syslog->getPort(); + } + REQUIRE(port != 0); + return port; +} Review Comment: can we use `utils::scheduleProcessorOnRandomPort()` instead of this? ## libminifi/test/Utils.h: ## @@ -183,33 +188,54 @@ bool sendMessagesViaSSL(const std::vector& contents, asio::error_code err; socket.lowest_layer().connect(remote_endpoint, err); if (err) { -return false; +return err; } socket.handshake(asio::ssl::stream_base::client, err); if (err) { -return false; +return err; } for (auto& content : contents) { std::string tcp_message(content); tcp_message += '\n'; asio::write(socket, asio::buffer(tcp_message, tcp_message.size()), err); if (err) { - return false; + return err; } } - return true; + return std::error_code(); } #ifdef WIN32 inline std::error_code hide_file(const std::filesystem::path& file_name) { -const bool success = SetFileAttributesA(file_name.string().c_str(), FILE_ATTRIBUTE_HIDDEN); -if (!success) { - // note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing. - // The below casting is safe in [0;std::numeric_limits::max()], int max is guaranteed to be at least 32767 - return { static_cast(GetLastError()), std::system_category() }; -} -return {}; + const bool success = SetFileAttributesA(file_name.string().c_str(), FILE_ATTRIBUTE_HIDDEN); + if (!success) { +// note: All possible documented error codes from GetLastError are in [0;15999] at the time of writing. +// The below casting is safe in [0;std::numeric_limits::max()], int max is guaranteed to be at least 32767 +return { static_cast(GetLastError()), std::system_category() }; } + return {}; +} #endif /* WIN32 */ +template +concept NetworkingProcessor = std::derived_from +&& requires(T x) { + {T::Port} -> std::convertible_to; + {x.getPort()} -> std::convertible_to; +}; // NOLINT(readability/braces) + +template +uint16_t scheduleProcessorOnRandomPort(const std::shared_ptr& test_plan, const std::shared_ptr& processor) { + REQUIRE(processor->setProperty(T::Port, "0")); + test_plan->scheduleProcessor(processor); + uint16_t port = processor->getPort(); + auto deadline = std::chrono::steady_clock::now() + 200ms; + while (port == 0 && deadline > std::chrono::steady_clock::now()) { +std::this_thread::sleep_for(20ms); +port = processor->getPort(); + } + REQUIRE(port != 0); + return port; Review Comment: this could be rewritten to use `verifyEventHappenedInPollTime`, too ## extensions/standard-processors/processors/PutTCP.cpp: ## @@ -160,339 +178,147 @
[jira] [Updated] (NIFI-11058) QueryDatabaseTable Maximum-value Columns oracle performance
[ https://issues.apache.org/jira/browse/NIFI-11058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zenkovac updated NIFI-11058: Description: when running an incremental query with QueryDatabaseTable, the field under "Maximum-value Columns" is used in subsequent queries to create an incremental ingest of data, but such field is used in the query "as is", so oracle becomes un-performant. make an improvent providing the ability to set the "Maximum-value Columns" as an oracle bind variable. explanation: [https://www.akadia.com/services/ora_bind_variables.html] was: when running an incremental query with QueryDatabaseTable, the field under "Maximum-value Columns" is used in subsequent queries to create an incremental ingest of data, but such field is used in the queri "as is", so oracle become un-performant. make an improvent providing the ability to set the "Maximum-value Columns" as an oracle bind variable. explanation: https://www.akadia.com/services/ora_bind_variables.html > QueryDatabaseTable Maximum-value Columns oracle performance > --- > > Key: NIFI-11058 > URL: https://issues.apache.org/jira/browse/NIFI-11058 > Project: Apache NiFi > Issue Type: Improvement > Environment: nifi up to latest 1.19.1 >Reporter: Zenkovac >Priority: Major > > when running an incremental query with QueryDatabaseTable, the field under > "Maximum-value Columns" is used in subsequent queries to create an > incremental ingest of data, but such field is used in the query "as is", so > oracle becomes un-performant. > make an improvent providing the ability to set the "Maximum-value Columns" as > an oracle bind variable. > explanation: [https://www.akadia.com/services/ora_bind_variables.html] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (NIFI-11058) QueryDatabaseTable Maximum-value Columns oracle performance
Zenkovac created NIFI-11058: --- Summary: QueryDatabaseTable Maximum-value Columns oracle performance Key: NIFI-11058 URL: https://issues.apache.org/jira/browse/NIFI-11058 Project: Apache NiFi Issue Type: Improvement Environment: nifi up to latest 1.19.1 Reporter: Zenkovac when running an incremental query with QueryDatabaseTable, the field under "Maximum-value Columns" is used in subsequent queries to create an incremental ingest of data, but such field is used in the queri "as is", so oracle become un-performant. make an improvent providing the ability to set the "Maximum-value Columns" as an oracle bind variable. explanation: https://www.akadia.com/services/ora_bind_variables.html -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [nifi] mr1716 opened a new pull request, #6851: NIFI-11024: Update iceberg.version to 1.1.0
mr1716 opened a new pull request, #6851: URL: https://github.com/apache/nifi/pull/6851 # Summary [NIFI-11024](https://issues.apache.org/jira/browse/NIFI-11024) # Tracking Please complete the following tracking steps prior to pull request creation. ### Issue Tracking - [X] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI-11024) issue created ### Pull Request Tracking - [X] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-0` - [X] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-0` ### Pull Request Formatting - [X] Pull Request based on current revision of the `main` branch - [X] Pull Request refers to a feature branch with one commit containing changes # Verification Please indicate the verification steps performed prior to pull request creation. ### Build - [ ] Build completed using `mvn clean install -P contrib-check` - [X] JDK 8 - [X] JDK 11 - [X] JDK 17 ### Licensing - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html) - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files ### Documentation - [ ] Documentation formatting appears as expected in rendered files -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1071202236 ## extensions/mqtt/processors/AbstractMQTTProcessor.cpp: ## @@ -137,49 +148,317 @@ void AbstractMQTTProcessor::onSchedule(const std::shared_ptrlog_error("MQTT client is not existing while trying to reconnect"); +throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT client is not existing while trying to reconnect"); + } + if (MQTTAsync_isConnected(client_)) { +logger_->log_debug("Already connected to %s, no need to reconnect", uri_); return; } + + MQTTAsync_connectOptions connect_options; + MQTTProperties connect_properties = MQTTProperties_initializer; + MQTTProperties will_properties = MQTTProperties_initializer; + + ConnectFinishedTask connect_finished_task( + [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5) { +onConnectFinished(success_data, success_data_5, failure_data, failure_data_5); + }); + + setConnectOptions(connect_options, connect_properties, will_properties, connect_finished_task); + + logger_->log_info("Reconnecting to %s", uri_); if (MQTTAsync_isConnected(client_)) { -logger_->log_info("Already connected to %s, no need to reconnect", uri_); +logger_->log_debug("Already connected to %s, no need to reconnect", uri_); +return; + } + const int ret = MQTTAsync_connect(client_, &connect_options); + MQTTProperties_free(&connect_properties); + if (ret != MQTTASYNC_SUCCESS) { +logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error code [%d]", uri_, ret); return; } - MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; - conn_opts.keepAliveInterval = gsl::narrow(keep_alive_interval_.count()); - conn_opts.cleansession = getCleanSession(); - conn_opts.context = this; - conn_opts.onSuccess = connectionSuccess; - conn_opts.onFailure = connectionFailure; - conn_opts.connectTimeout = gsl::narrow(connection_timeout_.count()); + + // wait until connection succeeds or fails + connect_finished_task.get_future().get(); +} + +void AbstractMQTTProcessor::setConnectOptions(MQTTAsync_connectOptions& connect_options, MQTTProperties& connect_properties, + MQTTProperties& will_properties, const ConnectFinishedTask& connect_finished_task) const { + if (mqtt_version_.value() == MqttVersions::V_5_0) { +setMqtt5ConnectOptions(connect_options, connect_properties, will_properties); + } else { +setMqtt3ConnectOptions(connect_options); + } + + connect_options.context = const_cast(&connect_finished_task); + connect_options.connectTimeout = gsl::narrow(connection_timeout_.count()); + connect_options.keepAliveInterval = gsl::narrow(keep_alive_interval_.count()); if (!username_.empty()) { -conn_opts.username = username_.c_str(); -conn_opts.password = password_.c_str(); +connect_options.username = username_.c_str(); +connect_options.password = password_.c_str(); } if (sslOpts_) { -conn_opts.ssl = &*sslOpts_; +connect_options.ssl = const_cast(&*sslOpts_); } if (last_will_) { -conn_opts.will = &*last_will_; +connect_options.will = const_cast(&*last_will_); } +} - logger_->log_info("Reconnecting to %s", uri_); - int ret = MQTTAsync_connect(client_, &conn_opts); - if (ret != MQTTASYNC_SUCCESS) { -logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, ret); +void AbstractMQTTProcessor::setMqtt3ConnectOptions(MQTTAsync_connectOptions& connect_options) const { + connect_options = MQTTAsync_connectOptions_initializer; + connect_options.onSuccess = connectionSuccess; + connect_options.onFailure = connectionFailure; + connect_options.cleansession = getCleanSession(); + + if (mqtt_version_.value() == MqttVersions::V_3_1_0) { +connect_options.MQTTVersion = MQTTVERSION_3_1; + } else if (mqtt_version_.value() == MqttVersions::V_3_1_1) { +connect_options.MQTTVersion = MQTTVERSION_3_1_1; + } +} + +void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& connect_options, MQTTProperties& connect_properties, MQTTProperties& will_properties) const { + connect_options = MQTTAsync_connectOptions_initializer5; + connect_options.onSuccess5 = connectionSuccess5; + connect_options.onFailure5 = connectionFailure5; + connect_options.connectProperties = &connect_properties; + + connect_options.cleanstart = getCleanStart(); + + { +MQTTProperty property; +property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL; +property.value.integer4 = gsl::narrow(getSessionExpiryInterval().count()); Review Comment: That does not, but MQTT 5 specs does: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048 "Followed by the Four Byt
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1071202236 ## extensions/mqtt/processors/AbstractMQTTProcessor.cpp: ## @@ -137,49 +148,317 @@ void AbstractMQTTProcessor::onSchedule(const std::shared_ptrlog_error("MQTT client is not existing while trying to reconnect"); +throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT client is not existing while trying to reconnect"); + } + if (MQTTAsync_isConnected(client_)) { +logger_->log_debug("Already connected to %s, no need to reconnect", uri_); return; } + + MQTTAsync_connectOptions connect_options; + MQTTProperties connect_properties = MQTTProperties_initializer; + MQTTProperties will_properties = MQTTProperties_initializer; + + ConnectFinishedTask connect_finished_task( + [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5) { +onConnectFinished(success_data, success_data_5, failure_data, failure_data_5); + }); + + setConnectOptions(connect_options, connect_properties, will_properties, connect_finished_task); + + logger_->log_info("Reconnecting to %s", uri_); if (MQTTAsync_isConnected(client_)) { -logger_->log_info("Already connected to %s, no need to reconnect", uri_); +logger_->log_debug("Already connected to %s, no need to reconnect", uri_); +return; + } + const int ret = MQTTAsync_connect(client_, &connect_options); + MQTTProperties_free(&connect_properties); + if (ret != MQTTASYNC_SUCCESS) { +logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error code [%d]", uri_, ret); return; } - MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; - conn_opts.keepAliveInterval = gsl::narrow(keep_alive_interval_.count()); - conn_opts.cleansession = getCleanSession(); - conn_opts.context = this; - conn_opts.onSuccess = connectionSuccess; - conn_opts.onFailure = connectionFailure; - conn_opts.connectTimeout = gsl::narrow(connection_timeout_.count()); + + // wait until connection succeeds or fails + connect_finished_task.get_future().get(); +} + +void AbstractMQTTProcessor::setConnectOptions(MQTTAsync_connectOptions& connect_options, MQTTProperties& connect_properties, + MQTTProperties& will_properties, const ConnectFinishedTask& connect_finished_task) const { + if (mqtt_version_.value() == MqttVersions::V_5_0) { +setMqtt5ConnectOptions(connect_options, connect_properties, will_properties); + } else { +setMqtt3ConnectOptions(connect_options); + } + + connect_options.context = const_cast(&connect_finished_task); + connect_options.connectTimeout = gsl::narrow(connection_timeout_.count()); + connect_options.keepAliveInterval = gsl::narrow(keep_alive_interval_.count()); if (!username_.empty()) { -conn_opts.username = username_.c_str(); -conn_opts.password = password_.c_str(); +connect_options.username = username_.c_str(); +connect_options.password = password_.c_str(); } if (sslOpts_) { -conn_opts.ssl = &*sslOpts_; +connect_options.ssl = const_cast(&*sslOpts_); } if (last_will_) { -conn_opts.will = &*last_will_; +connect_options.will = const_cast(&*last_will_); } +} - logger_->log_info("Reconnecting to %s", uri_); - int ret = MQTTAsync_connect(client_, &conn_opts); - if (ret != MQTTASYNC_SUCCESS) { -logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, ret); +void AbstractMQTTProcessor::setMqtt3ConnectOptions(MQTTAsync_connectOptions& connect_options) const { + connect_options = MQTTAsync_connectOptions_initializer; + connect_options.onSuccess = connectionSuccess; + connect_options.onFailure = connectionFailure; + connect_options.cleansession = getCleanSession(); + + if (mqtt_version_.value() == MqttVersions::V_3_1_0) { +connect_options.MQTTVersion = MQTTVERSION_3_1; + } else if (mqtt_version_.value() == MqttVersions::V_3_1_1) { +connect_options.MQTTVersion = MQTTVERSION_3_1_1; + } +} + +void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& connect_options, MQTTProperties& connect_properties, MQTTProperties& will_properties) const { + connect_options = MQTTAsync_connectOptions_initializer5; + connect_options.onSuccess5 = connectionSuccess5; + connect_options.onFailure5 = connectionFailure5; + connect_options.connectProperties = &connect_properties; + + connect_options.cleanstart = getCleanStart(); + + { +MQTTProperty property; +property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL; +property.value.integer4 = gsl::narrow(getSessionExpiryInterval().count()); Review Comment: That does not, but MQTT 5 specs does: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048 "Followed by the Four Byt
[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
szaszm commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1066925909 ## extensions/mqtt/processors/AbstractMQTTProcessor.cpp: ## @@ -137,49 +148,317 @@ void AbstractMQTTProcessor::onSchedule(const std::shared_ptrlog_error("MQTT client is not existing while trying to reconnect"); +throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT client is not existing while trying to reconnect"); + } + if (MQTTAsync_isConnected(client_)) { +logger_->log_debug("Already connected to %s, no need to reconnect", uri_); return; } + + MQTTAsync_connectOptions connect_options; + MQTTProperties connect_properties = MQTTProperties_initializer; + MQTTProperties will_properties = MQTTProperties_initializer; + + ConnectFinishedTask connect_finished_task( + [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5) { +onConnectFinished(success_data, success_data_5, failure_data, failure_data_5); + }); + + setConnectOptions(connect_options, connect_properties, will_properties, connect_finished_task); + + logger_->log_info("Reconnecting to %s", uri_); if (MQTTAsync_isConnected(client_)) { -logger_->log_info("Already connected to %s, no need to reconnect", uri_); +logger_->log_debug("Already connected to %s, no need to reconnect", uri_); +return; + } + const int ret = MQTTAsync_connect(client_, &connect_options); + MQTTProperties_free(&connect_properties); + if (ret != MQTTASYNC_SUCCESS) { +logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error code [%d]", uri_, ret); return; } - MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; - conn_opts.keepAliveInterval = gsl::narrow(keep_alive_interval_.count()); - conn_opts.cleansession = getCleanSession(); - conn_opts.context = this; - conn_opts.onSuccess = connectionSuccess; - conn_opts.onFailure = connectionFailure; - conn_opts.connectTimeout = gsl::narrow(connection_timeout_.count()); + + // wait until connection succeeds or fails + connect_finished_task.get_future().get(); +} + +void AbstractMQTTProcessor::setConnectOptions(MQTTAsync_connectOptions& connect_options, MQTTProperties& connect_properties, + MQTTProperties& will_properties, const ConnectFinishedTask& connect_finished_task) const { + if (mqtt_version_.value() == MqttVersions::V_5_0) { +setMqtt5ConnectOptions(connect_options, connect_properties, will_properties); + } else { +setMqtt3ConnectOptions(connect_options); + } + + connect_options.context = const_cast(&connect_finished_task); + connect_options.connectTimeout = gsl::narrow(connection_timeout_.count()); + connect_options.keepAliveInterval = gsl::narrow(keep_alive_interval_.count()); if (!username_.empty()) { -conn_opts.username = username_.c_str(); -conn_opts.password = password_.c_str(); +connect_options.username = username_.c_str(); +connect_options.password = password_.c_str(); } if (sslOpts_) { -conn_opts.ssl = &*sslOpts_; +connect_options.ssl = const_cast(&*sslOpts_); } if (last_will_) { -conn_opts.will = &*last_will_; +connect_options.will = const_cast(&*last_will_); } +} - logger_->log_info("Reconnecting to %s", uri_); - int ret = MQTTAsync_connect(client_, &conn_opts); - if (ret != MQTTASYNC_SUCCESS) { -logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, ret); +void AbstractMQTTProcessor::setMqtt3ConnectOptions(MQTTAsync_connectOptions& connect_options) const { + connect_options = MQTTAsync_connectOptions_initializer; + connect_options.onSuccess = connectionSuccess; + connect_options.onFailure = connectionFailure; + connect_options.cleansession = getCleanSession(); + + if (mqtt_version_.value() == MqttVersions::V_3_1_0) { +connect_options.MQTTVersion = MQTTVERSION_3_1; + } else if (mqtt_version_.value() == MqttVersions::V_3_1_1) { +connect_options.MQTTVersion = MQTTVERSION_3_1_1; + } +} + +void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& connect_options, MQTTProperties& connect_properties, MQTTProperties& will_properties) const { + connect_options = MQTTAsync_connectOptions_initializer5; + connect_options.onSuccess5 = connectionSuccess5; + connect_options.onFailure5 = connectionFailure5; + connect_options.connectProperties = &connect_properties; + + connect_options.cleanstart = getCleanStart(); + + { +MQTTProperty property; +property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL; +property.value.integer4 = gsl::narrow(getSessionExpiryInterval().count()); Review Comment: I think we should verify the type of this property. The pahq mqtt c docs don't document the type of each property, but provide a function to query it:
[GitHub] [nifi] nandorsoma commented on pull request #6850: NIFI-11049 Fixed flaky PutDropboxTest, attribute description corrected
nandorsoma commented on PR #6850: URL: https://github.com/apache/nifi/pull/6850#issuecomment-1383971711 Will review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] szaszm commented on a diff in pull request #1491: MINIFICPP-2032 Add support for new AWS regions
szaszm commented on code in PR #1491: URL: https://github.com/apache/nifi-minifi-cpp/pull/1491#discussion_r1071096863 ## CMakeLists.txt: ## @@ -106,7 +106,7 @@ if (WIN32) endif() if (WIN32) -add_definitions(-DSERVICE_NAME="Apache NiFi MINiFi") +add_definitions(-DMINIFI_SERVICE_NAME="Apache NiFi MINiFi") Review Comment: Thanks for the explanation and the change. I like the scoped version more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] lordgamez commented on a diff in pull request #1491: MINIFICPP-2032 Add support for new AWS regions
lordgamez commented on code in PR #1491: URL: https://github.com/apache/nifi-minifi-cpp/pull/1491#discussion_r1071077522 ## CMakeLists.txt: ## @@ -106,7 +106,7 @@ if (WIN32) endif() if (WIN32) -add_definitions(-DSERVICE_NAME="Apache NiFi MINiFi") +add_definitions(-DMINIFI_SERVICE_NAME="Apache NiFi MINiFi") Review Comment: Moved to SERVICE_NAME to target_compile_definitions in 45cbd67ab187b8f4164058d4c0c497ff9733fb32 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] krisztina-zsihovszki opened a new pull request, #6850: NIFI-11049 Fixed flaky PutDropboxTest, attribute description corrected
krisztina-zsihovszki opened a new pull request, #6850: URL: https://github.com/apache/nifi/pull/6850 # Summary [NIFI-11049](https://issues.apache.org/jira/browse/NIFI-11049) # Tracking Please complete the following tracking steps prior to pull request creation. ### Issue Tracking - [x] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created ### Pull Request Tracking - [x] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-11049` - [x] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-11049` ### Pull Request Formatting - [x] Pull Request based on current revision of the `main` branch - [x] Pull Request refers to a feature branch with one commit containing changes # Verification Please indicate the verification steps performed prior to pull request creation. ### Build - [ ] Build completed using `mvn clean install -P contrib-check` - [x] JDK 8 - [x] JDK 11 - [x] JDK 17 ### Licensing - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html) - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files ### Documentation - [x] Documentation formatting appears as expected in rendered files -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi] nandorsoma commented on pull request #6832: NIFI-10965 PutGoogleDrive
nandorsoma commented on PR #6832: URL: https://github.com/apache/nifi/pull/6832#issuecomment-1383687714 Will review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org