[jira] [Created] (NIFIREG-377) Sub buckets / Ability to move flow in another bucket
DEOM Damien created NIFIREG-377: --- Summary: Sub buckets / Ability to move flow in another bucket Key: NIFIREG-377 URL: https://issues.apache.org/jira/browse/NIFIREG-377 Project: NiFi Registry Issue Type: Improvement Reporter: DEOM Damien As the number of flows grows in the registry, it would be very convenient to add an option to move a dataflow in another bucket / sub bucket. Nice to have: add a parent attribute to buckets, and display all buckets in hierarchical form -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] MuazmaZ commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
MuazmaZ commented on issue #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2 URL: https://github.com/apache/nifi/pull/4126#issuecomment-606341926 @turcsanyip confirmed with the SDK team that getDirectoryClient(“”) is valid. Made the change in property to allow empty string. Removed PROP_ for SAS_TOKEN Updated to catch Exception and tested the scenario. I was able to see the failure files with a non-existing filesystem. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2 URL: https://github.com/apache/nifi/pull/4126#discussion_r400524840 ## File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +import com.azure.storage.file.datalake.DataLakeDirectoryClient; +import com.azure.storage.file.datalake.DataLakeFileClient; +import com.azure.storage.file.datalake.DataLakeFileSystemClient; +import com.azure.storage.file.datalake.DataLakeServiceClient; +import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; +import com.azure.storage.file.datalake.implementation.models.StorageErrorException; + +@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) +@CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2") +@WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"), +@WritesAttribute(attribute = "azure.directory", description = "The name of the Azure Directory"), +@WritesAttribute(attribute = "azure.filename", description = "The name of the Azure File Name"), +@WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for file content"), +@WritesAttribute(attribute = "azure.length", description = "Length of the file")}) +@InputRequirement(Requirement.INPUT_REQUIRED) + +public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor { + +@Override +public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile flowFile = session.get(); +if (flowFile == null) { +return; +} +final long startNanos = System.nanoTime(); +try { +final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue(); +final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); +final String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue(); +final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); +final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem); +final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory); +final DataLakeFileClient fileClient = directoryClient.createFile(fileName); +final long length = flowFile.getSize(); +if (length > 0) { +try (final InputStream rawIn = session.read(flowFile); final BufferedInputStream in = new BufferedInputStream(rawIn)) { +fileClient.append(in, 0, length); + +} +} +fileClient.flush(length); +final Map attributes = new HashMap<>(); +attributes.put("azure.filesystem", fileSystem); +attributes.put("azure.directory", directory); +attributes.put("azure.filename",
[GitHub] [nifi] turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2
turcsanyip commented on a change in pull request #4126: NIFI-7103 Adding PutAzureDataLakeStorage Processor to provide native support for Azure Data Lake Storage Gen 2 URL: https://github.com/apache/nifi/pull/4126#discussion_r400519009 ## File path: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java ## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.commons.lang3.StringUtils; + +import com.azure.storage.common.StorageSharedKeyCredential; +import com.azure.storage.file.datalake.DataLakeServiceClient; +import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; + +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.Map; + +public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor { + +public static final PropertyDescriptor ACCOUNT_NAME = new PropertyDescriptor.Builder() +.name("storage-account-name").displayName("Storage Account Name") +.description("The storage account name. There are certain risks in allowing the account name to be stored as a flowfile " + +"attribute. While it does provide for a more flexible flow by allowing the account name to " + +"be fetched dynamically from a flowfile attribute, care must be taken to restrict access to " + +"the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " + +"In addition, the provenance repositories may be put on encrypted disk partitions." + +" Instead of defining the Storage Account Name, Storage Account Key and SAS Token properties directly on the processor, " + +"the preferred way is to configure them through a controller service") +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.required(true) +.sensitive(true).build(); + +public static final PropertyDescriptor ACCOUNT_KEY = new PropertyDescriptor.Builder() +.name("storage-account-key").displayName("Storage Account Key") +.description("The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " + +"one uses Shared Access Signature (SAS) token instead for fine-grained control with policies. " + +"There are certain risks in allowing the account key to be stored as a flowfile " + +"attribute. While it does provide for a more flexible flow by allowing the account key to " + +"be fetched dynamically from a flow file attribute, care must be taken to restrict access to " + +"the event provenance data (e.g. by strictly controlling the policies governing provenance for this Processor). " + +"In addition, the provenance repositories may be put on encrypted disk partitions.") +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.required(false) +.sensitive(true).build(); + +public static final PropertyDescriptor PROP_SAS_TOKEN = new PropertyDescriptor.Builder() Review comment: Minor: I overlooked it last time
[GitHub] [nifi-registry] thenatog opened a new pull request #271: NIFIREG-376 - Upgrading jackson-databind to 2.9.10.3
thenatog opened a new pull request #271: NIFIREG-376 - Upgrading jackson-databind to 2.9.10.3 URL: https://github.com/apache/nifi-registry/pull/271 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (NIFIREG-376) Upgrade jackson-databind
Nathan Gough created NIFIREG-376: Summary: Upgrade jackson-databind Key: NIFIREG-376 URL: https://issues.apache.org/jira/browse/NIFIREG-376 Project: NiFi Registry Issue Type: Task Affects Versions: 0.5.0 Reporter: Nathan Gough Assignee: Nathan Gough Fix For: 0.6.0 Upgrade jackson-databind to latest (2.9.10.3). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (NIFI-7294) Flows with SolrProcessor configured to use SSLContextService is failing
[ https://issues.apache.org/jira/browse/NIFI-7294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17071271#comment-17071271 ] ASF subversion and git services commented on NIFI-7294: --- Commit fee1b8b8e04e5901db612e52c9d53f9096f6d558 in nifi's branch refs/heads/master from Mubashir Kazia [ https://gitbox.apache.org/repos/asf?p=nifi.git;h=fee1b8b ] NIFI-7294 Address deprecation issues in solrj and httpclient Some calls to deprecated methods in httpclient were resulting in UnsupportedOperationException. Use the new API calls in both httpclient and solrj. Add an integration test to include test coverage for org.apache.nifi.processors.solr.SolrUtils.createClient This closes #4171. > Flows with SolrProcessor configured to use SSLContextService is failing > --- > > Key: NIFI-7294 > URL: https://issues.apache.org/jira/browse/NIFI-7294 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.12.0 >Reporter: Mubashir Kazia >Priority: Major > Time Spent: 1h 50m > Remaining Estimate: 0h > > After applying the fix in NIFI-7269, flows using SolrProcessors that are > configured to use SSLContextService are failing with exception: > {code} > java.lang.UnsupportedOperationException: null > at > org.apache.http.impl.client.InternalHttpClient$1.getSchemeRegistry(InternalHttpClient.java:239) > at > org.apache.nifi.processors.solr.SolrUtils.createSolrClient(SolrUtils.java:236) > at > org.apache.nifi.processors.solr.SolrProcessor.createSolrClient(SolrProcessor.java:147) > at > org.apache.nifi.processors.solr.SolrProcessor.onScheduled(SolrProcessor.java:77) > at sun.reflect.GeneratedMethodAccessor166.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:142) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:130) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:75) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotation(ReflectionUtils.java:52) > at > org.apache.nifi.controller.StandardProcessorNode.lambda$initiateStart$4(StandardProcessorNode.java:1515) > at org.apache.nifi.engine.FlowEngine$3.call(FlowEngine.java:123) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745){code} > This is because of the new httpclient version that was pulled as a dependency > as part of the solrj upgrade in NIFI-7269. The getConnectionManager method in > HttpClient is deprecated and the way to set the schema registry has changed > in the new version of HttpClient. The problematic code is > {code} > httpClient.getConnectionManager().getSchemeRegistry().register(httpsScheme); > {code} > in function createSolrClient. There is no test coverage for this function > which is why it was not detected in the code changes for NIFI-7269. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (NIFI-7294) Flows with SolrProcessor configured to use SSLContextService is failing
[ https://issues.apache.org/jira/browse/NIFI-7294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bryan Bende resolved NIFI-7294. --- Fix Version/s: 1.12.0 Assignee: Bryan Bende Resolution: Fixed > Flows with SolrProcessor configured to use SSLContextService is failing > --- > > Key: NIFI-7294 > URL: https://issues.apache.org/jira/browse/NIFI-7294 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.12.0 >Reporter: Mubashir Kazia >Assignee: Bryan Bende >Priority: Major > Fix For: 1.12.0 > > Time Spent: 2h > Remaining Estimate: 0h > > After applying the fix in NIFI-7269, flows using SolrProcessors that are > configured to use SSLContextService are failing with exception: > {code} > java.lang.UnsupportedOperationException: null > at > org.apache.http.impl.client.InternalHttpClient$1.getSchemeRegistry(InternalHttpClient.java:239) > at > org.apache.nifi.processors.solr.SolrUtils.createSolrClient(SolrUtils.java:236) > at > org.apache.nifi.processors.solr.SolrProcessor.createSolrClient(SolrProcessor.java:147) > at > org.apache.nifi.processors.solr.SolrProcessor.onScheduled(SolrProcessor.java:77) > at sun.reflect.GeneratedMethodAccessor166.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:142) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:130) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:75) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotation(ReflectionUtils.java:52) > at > org.apache.nifi.controller.StandardProcessorNode.lambda$initiateStart$4(StandardProcessorNode.java:1515) > at org.apache.nifi.engine.FlowEngine$3.call(FlowEngine.java:123) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745){code} > This is because of the new httpclient version that was pulled as a dependency > as part of the solrj upgrade in NIFI-7269. The getConnectionManager method in > HttpClient is deprecated and the way to set the schema registry has changed > in the new version of HttpClient. The problematic code is > {code} > httpClient.getConnectionManager().getSchemeRegistry().register(httpsScheme); > {code} > in function createSolrClient. There is no test coverage for this function > which is why it was not detected in the code changes for NIFI-7269. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] asfgit closed pull request #4171: NIFI-7294 Address deprecation issues in solrj and httpclient
asfgit closed pull request #4171: NIFI-7294 Address deprecation issues in solrj and httpclient URL: https://github.com/apache/nifi/pull/4171 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi] IlyaKovalev opened a new pull request #4172: NIFI-7297 add available()
IlyaKovalev opened a new pull request #4172: NIFI-7297 add available() URL: https://github.com/apache/nifi/pull/4172 Thank you for submitting a contribution to Apache NiFi. Please provide a short description of the PR here: Description of PR fixes bug NIFI-7297 In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with **NIFI-** where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically `master`)? - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._ ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] Have you verified that the full build is successful on both JDK 8 and JDK 11? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`? - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`? - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (NIFI-7297) Mock bundle: inputStream created py MockProcessSession doesn't count avalable() bytes
Ilya Kovalev created NIFI-7297: -- Summary: Mock bundle: inputStream created py MockProcessSession doesn't count avalable() bytes Key: NIFI-7297 URL: https://issues.apache.org/jira/browse/NIFI-7297 Project: Apache NiFi Issue Type: Bug Components: Core Framework Reporter: Ilya Kovalev MockProcessSession.read(FlowFile file) return `InputStream` doesn't count available bytes. due testing custom processor that read flowfile by following example BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)) ; while (reader.ready()){ tests will fail because available() return constantly 0 and reader's buffer is empty -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] mkazia opened a new pull request #4171: NIFI-7294 Address deprecation issues in solrj and httpclient
mkazia opened a new pull request #4171: NIFI-7294 Address deprecation issues in solrj and httpclient URL: https://github.com/apache/nifi/pull/4171 Thank you for submitting a contribution to Apache NiFi. Please provide a short description of the PR here: Description of PR _Address deprecation issues in solrj and httpclient; fixes bug NIFI-7294._ Some calls to deprecated methods in httpclient were resulting in UnsupportedOperationException. Use the new API calls in both httpclient and solrj. Add an integration test to include test coverage for org.apache.nifi.processors.solr.SolrUtils.createClient In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with **NIFI-** where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically `master`)? - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._ ### For code changes: - [x] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder? - [x] Have you written or updated unit tests to verify your changes? - [x] Have you verified that the full build is successful on both JDK 8 and JDK 11? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`? - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`? - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi] mkazia closed pull request #4170: NIFI-7294 Address deprecation issues in solrj and httpclient
mkazia closed pull request #4170: NIFI-7294 Address deprecation issues in solrj and httpclient URL: https://github.com/apache/nifi/pull/4170 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi] mkazia commented on issue #4170: NIFI-7294 Address deprecation issues in solrj and httpclient
mkazia commented on issue #4170: NIFI-7294 Address deprecation issues in solrj and httpclient URL: https://github.com/apache/nifi/pull/4170#issuecomment-606132267 Closing this so that I can open a new clean PR with just one commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi] bbende commented on issue #4170: NIFI-7294 Address deprecation issues in solrj and httpclient
bbende commented on issue #4170: NIFI-7294 Address deprecation issues in solrj and httpclient URL: https://github.com/apache/nifi/pull/4170#issuecomment-606110930 A couple of things from reviewing... 1) It seems like this branch includes the commits for the previous work on upgrading SolrJ, there should only be one commit in this PR for the new changes for NIFI-7294. 2) I think there is a potential issue with our use of the `HttpClientUtil` static methods... since we set static values in there, once one Solr processor has been created that is configured with Kerberos or SSL, then that static state has been set in `HttpClientUtil `and now if someone made a second Solr processor that wasn't using Kerberos or SSL, that stuff would still be set in `HttpClientUtil`. It looks like there is a method `resetHttpClientBuilder()` that we should call at the beginning of `createSolrClient`, but we also need to ensure we are inside a synchronized block so that two different Solr processors don't alter `HttpClientUtil` at the same time. We can do that by making `createSolrClient` synchronized. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi] mkazia commented on a change in pull request #4170: NIFI-7294 Address deprecation issues in solrj and httpclient
mkazia commented on a change in pull request #4170: NIFI-7294 Address deprecation issues in solrj and httpclient URL: https://github.com/apache/nifi/pull/4170#discussion_r400330184 ## File path: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/QuerySolrIT.java ## @@ -36,25 +39,29 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; import org.xmlunit.matchers.CompareMatcher; +import javax.net.ssl.SSLContext; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.text.SimpleDateFormat; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.TimeZone; +import static org.hamcrest.MatcherAssert.assertThat; Review comment: assertThat in junit is deprecated. See https://junit.org/junit4/javadoc/latest/org/junit/Assert.html#assertThat(T,%20org.hamcrest.Matcher) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi] mkazia commented on a change in pull request #4170: NIFI-7294 Address deprecation issues in solrj and httpclient
mkazia commented on a change in pull request #4170: NIFI-7294 Address deprecation issues in solrj and httpclient URL: https://github.com/apache/nifi/pull/4170#discussion_r400329921 ## File path: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestQuerySolr.java ## @@ -48,9 +48,9 @@ import java.util.Map; import java.util.TimeZone; +import static org.hamcrest.MatcherAssert.assertThat; Review comment: assertThat in junit is deprecated. See https://junit.org/junit4/javadoc/latest/org/junit/Assert.html#assertThat(T,%20org.hamcrest.Matcher) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi] mkazia commented on a change in pull request #4170: NIFI-7294 Address deprecation issues in solrj and httpclient
mkazia commented on a change in pull request #4170: NIFI-7294 Address deprecation issues in solrj and httpclient URL: https://github.com/apache/nifi/pull/4170#discussion_r400330050 ## File path: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java ## @@ -48,8 +48,8 @@ import java.util.Locale; import java.util.TimeZone; +import static org.hamcrest.MatcherAssert.assertThat; Review comment: assertThat in junit is deprecated. See https://junit.org/junit4/javadoc/latest/org/junit/Assert.html#assertThat(T,%20org.hamcrest.Matcher) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi] pvillard31 commented on a change in pull request #4170: NIFI-7294 Address deprecation issues in solrj and httpclient
pvillard31 commented on a change in pull request #4170: NIFI-7294 Address deprecation issues in solrj and httpclient URL: https://github.com/apache/nifi/pull/4170#discussion_r400326691 ## File path: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/QuerySolrIT.java ## @@ -36,25 +39,29 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; import org.xmlunit.matchers.CompareMatcher; +import javax.net.ssl.SSLContext; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.text.SimpleDateFormat; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.TimeZone; +import static org.hamcrest.MatcherAssert.assertThat; Review comment: Is this change of class required? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi] pvillard31 commented on a change in pull request #4170: NIFI-7294 Address deprecation issues in solrj and httpclient
pvillard31 commented on a change in pull request #4170: NIFI-7294 Address deprecation issues in solrj and httpclient URL: https://github.com/apache/nifi/pull/4170#discussion_r400326224 ## File path: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestGetSolr.java ## @@ -48,8 +48,8 @@ import java.util.Locale; import java.util.TimeZone; +import static org.hamcrest.MatcherAssert.assertThat; Review comment: Is this change required? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi] pvillard31 commented on a change in pull request #4170: NIFI-7294 Address deprecation issues in solrj and httpclient
pvillard31 commented on a change in pull request #4170: NIFI-7294 Address deprecation issues in solrj and httpclient URL: https://github.com/apache/nifi/pull/4170#discussion_r400326052 ## File path: nifi-nar-bundles/nifi-solr-bundle/nifi-solr-processors/src/test/java/org/apache/nifi/processors/solr/TestQuerySolr.java ## @@ -48,9 +48,9 @@ import java.util.Map; import java.util.TimeZone; +import static org.hamcrest.MatcherAssert.assertThat; Review comment: I believe this change is not required, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (NIFI-7259) DeleteAzureDataLakeStorage processor to provide native delete support for Azure Data lake Gen 2 Storage
[ https://issues.apache.org/jira/browse/NIFI-7259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17071105#comment-17071105 ] Bill SAndman commented on NIFI-7259: Apologies. Missed that totally. Thanks for the correction. > DeleteAzureDataLakeStorage processor to provide native delete support for > Azure Data lake Gen 2 Storage > --- > > Key: NIFI-7259 > URL: https://issues.apache.org/jira/browse/NIFI-7259 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Affects Versions: 1.12.0 >Reporter: Muazma Zahid >Assignee: Muazma Zahid >Priority: Major > Labels: azure > > [Azure Data Lake Storage > Gen2|[https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction]] > makes Azure Storage the foundation for building enterprise data lakes on > Azure. Nifi supports Azure Blob storage but has no native support for ADLS > Gen 2 > This processor will be followed by Get/List to provide native support for > ADLS Gen 2 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (NIFI-7259) DeleteAzureDataLakeStorage processor to provide native delete support for Azure Data lake Gen 2 Storage
[ https://issues.apache.org/jira/browse/NIFI-7259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17071099#comment-17071099 ] Muazma Zahid commented on NIFI-7259: [~wsandman] Put PR is already in review: [https://github.com/apache/nifi/pull/4126/] Fetch and List would be next after Delete. > DeleteAzureDataLakeStorage processor to provide native delete support for > Azure Data lake Gen 2 Storage > --- > > Key: NIFI-7259 > URL: https://issues.apache.org/jira/browse/NIFI-7259 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Affects Versions: 1.12.0 >Reporter: Muazma Zahid >Assignee: Muazma Zahid >Priority: Major > Labels: azure > > [Azure Data Lake Storage > Gen2|[https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction]] > makes Azure Storage the foundation for building enterprise data lakes on > Azure. Nifi supports Azure Blob storage but has no native support for ADLS > Gen 2 > This processor will be followed by Get/List to provide native support for > ADLS Gen 2 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi-registry] thenatog opened a new pull request #270: NIFIREG-364 - Upgrade maven checkstyle version
thenatog opened a new pull request #270: NIFIREG-364 - Upgrade maven checkstyle version URL: https://github.com/apache/nifi-registry/pull/270 Upgraded maven com.puppycrawl.tools:checkstyle version to v8.31. Some rules have been fully deprecated since v8.21, so these were removed from our rules. LineLength had to be moved outside of the TreeWalker module to its own module. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi] anaylor commented on issue #4130: NIFI-6235 - Prioritizing standard content war loading order
anaylor commented on issue #4130: NIFI-6235 - Prioritizing standard content war loading order URL: https://github.com/apache/nifi/pull/4130#issuecomment-606089157 Hey @mcgilman thanks for taking a look I am not sure if adding additional known content-viewers for the same media type would cause an overwrite, but the current behavior will not allow additional content viewer types to be added to the mimeMapping unless the war that contains those content-viewers is loaded after the nifi-standard-content viewer. I am not sure if this is the correct fix for Apache Nifi but it does resolve the problem of loading custom content viewers in additional wars. Let me know if this makes sense. Thanks Andy This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi] mkazia opened a new pull request #4170: NIFI-7294 Address deprecation issues in solrj and httpclient
mkazia opened a new pull request #4170: NIFI-7294 Address deprecation issues in solrj and httpclient URL: https://github.com/apache/nifi/pull/4170 Thank you for submitting a contribution to Apache NiFi. Please provide a short description of the PR here: Description of PR _ Fixes bug NIFI-7294._ Address deprecation issues in solrj and httpclient Some calls to deprecated methods in httpclient were resulting in UnsupportedOperationException. Use the new API calls in both httpclient and solrj. Add an integration test to include test coverage for org.apache.nifi.processors.solr.SolrUtils.createClient In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with **NIFI-** where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically `master`)? - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._ ### For code changes: - [x] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder? - [x] Have you written or updated unit tests to verify your changes? - [x] Have you verified that the full build is successful on both JDK 8 and JDK 11? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`? - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`? - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (NIFI-7259) DeleteAzureDataLakeStorage processor to provide native delete support for Azure Data lake Gen 2 Storage
[ https://issues.apache.org/jira/browse/NIFI-7259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17071083#comment-17071083 ] Bill SAndman commented on NIFI-7259: Just an observation, as delete is a great place to start, followed by Get and List. Please also include Put and Fetch > DeleteAzureDataLakeStorage processor to provide native delete support for > Azure Data lake Gen 2 Storage > --- > > Key: NIFI-7259 > URL: https://issues.apache.org/jira/browse/NIFI-7259 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Affects Versions: 1.12.0 >Reporter: Muazma Zahid >Assignee: Muazma Zahid >Priority: Major > Labels: azure > > [Azure Data Lake Storage > Gen2|[https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-introduction]] > makes Azure Storage the foundation for building enterprise data lakes on > Azure. Nifi supports Azure Blob storage but has no native support for ADLS > Gen 2 > This processor will be followed by Get/List to provide native support for > ADLS Gen 2 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (NIFIREG-375) Improve Github Actions Integration and remove Travis
Joe Witt created NIFIREG-375: Summary: Improve Github Actions Integration and remove Travis Key: NIFIREG-375 URL: https://issues.apache.org/jira/browse/NIFIREG-375 Project: NiFi Registry Issue Type: Task Reporter: Joe Witt Assignee: Joe Witt -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (NIFI-6051) FetchFile fails on filenames containing characters with orthographic marks
[ https://issues.apache.org/jira/browse/NIFI-6051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17071013#comment-17071013 ] Fan Li edited comment on NIFI-6051 at 3/30/20, 3:16 PM: Is there a recommended work around? I tried to read files within a "ExecuteScript" processor and it appeared to fail for the same reason. was (Author: lifan): Is there a recommended work around? > FetchFile fails on filenames containing characters with orthographic marks > -- > > Key: NIFI-6051 > URL: https://issues.apache.org/jira/browse/NIFI-6051 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.8.0 >Reporter: Andy LoPresto >Assignee: Juan Carlos de la Prada de Haro >Priority: Major > Labels: encoding, file, utf-8 > Attachments: Sin título.png > > > As reported in the Slack channel: > {quote} > Hi all! I'm trying to use FetchFile processor to read some files but it's not > possible when directories includes special characters. I tried to solve that > issue modifying the character set to UTF8. To do so I add > "java.arg.8=-Dfile.encoding=UTF8" argument in the boostrap.conf file but it > doesn't work properly. > {quote} > The processor (and any other that interacts with the file system) should be > fixed to properly handle filenames such as: > * {{información.txt}} > * {{líneas.csv}} > * {{tack_så_mycket.json}} > * {{słowo.xml}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (NIFI-7294) Flows with SolrProcessor configured to use SSLContextService is failing
[ https://issues.apache.org/jira/browse/NIFI-7294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17071031#comment-17071031 ] Mubashir Kazia commented on NIFI-7294: -- [~bbende]. Yes. I have the code change ready, I'm trying to get an integration test working to make sure the code is covered by a test. > Flows with SolrProcessor configured to use SSLContextService is failing > --- > > Key: NIFI-7294 > URL: https://issues.apache.org/jira/browse/NIFI-7294 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.12.0 >Reporter: Mubashir Kazia >Priority: Major > > After applying the fix in NIFI-7269, flows using SolrProcessors that are > configured to use SSLContextService are failing with exception: > {code} > java.lang.UnsupportedOperationException: null > at > org.apache.http.impl.client.InternalHttpClient$1.getSchemeRegistry(InternalHttpClient.java:239) > at > org.apache.nifi.processors.solr.SolrUtils.createSolrClient(SolrUtils.java:236) > at > org.apache.nifi.processors.solr.SolrProcessor.createSolrClient(SolrProcessor.java:147) > at > org.apache.nifi.processors.solr.SolrProcessor.onScheduled(SolrProcessor.java:77) > at sun.reflect.GeneratedMethodAccessor166.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:142) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:130) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:75) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotation(ReflectionUtils.java:52) > at > org.apache.nifi.controller.StandardProcessorNode.lambda$initiateStart$4(StandardProcessorNode.java:1515) > at org.apache.nifi.engine.FlowEngine$3.call(FlowEngine.java:123) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745){code} > This is because of the new httpclient version that was pulled as a dependency > as part of the solrj upgrade in NIFI-7269. The getConnectionManager method in > HttpClient is deprecated and the way to set the schema registry has changed > in the new version of HttpClient. The problematic code is > {code} > httpClient.getConnectionManager().getSchemeRegistry().register(httpsScheme); > {code} > in function createSolrClient. There is no test coverage for this function > which is why it was not detected in the code changes for NIFI-7269. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (NIFI-7294) Flows with SolrProcessor configured to use SSLContextService is failing
[ https://issues.apache.org/jira/browse/NIFI-7294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17071026#comment-17071026 ] Bryan Bende commented on NIFI-7294: --- [~mkazia] thanks for reporting this, are you working on a pull request? > Flows with SolrProcessor configured to use SSLContextService is failing > --- > > Key: NIFI-7294 > URL: https://issues.apache.org/jira/browse/NIFI-7294 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.12.0 >Reporter: Mubashir Kazia >Priority: Major > > After applying the fix in NIFI-7269, flows using SolrProcessors that are > configured to use SSLContextService are failing with exception: > {code} > java.lang.UnsupportedOperationException: null > at > org.apache.http.impl.client.InternalHttpClient$1.getSchemeRegistry(InternalHttpClient.java:239) > at > org.apache.nifi.processors.solr.SolrUtils.createSolrClient(SolrUtils.java:236) > at > org.apache.nifi.processors.solr.SolrProcessor.createSolrClient(SolrProcessor.java:147) > at > org.apache.nifi.processors.solr.SolrProcessor.onScheduled(SolrProcessor.java:77) > at sun.reflect.GeneratedMethodAccessor166.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:142) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:130) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotations(ReflectionUtils.java:75) > at > org.apache.nifi.util.ReflectionUtils.invokeMethodsWithAnnotation(ReflectionUtils.java:52) > at > org.apache.nifi.controller.StandardProcessorNode.lambda$initiateStart$4(StandardProcessorNode.java:1515) > at org.apache.nifi.engine.FlowEngine$3.call(FlowEngine.java:123) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745){code} > This is because of the new httpclient version that was pulled as a dependency > as part of the solrj upgrade in NIFI-7269. The getConnectionManager method in > HttpClient is deprecated and the way to set the schema registry has changed > in the new version of HttpClient. The problematic code is > {code} > httpClient.getConnectionManager().getSchemeRegistry().register(httpsScheme); > {code} > in function createSolrClient. There is no test coverage for this function > which is why it was not detected in the code changes for NIFI-7269. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (NIFI-6051) FetchFile fails on filenames containing characters with orthographic marks
[ https://issues.apache.org/jira/browse/NIFI-6051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17071013#comment-17071013 ] Fan Li commented on NIFI-6051: -- Is there a recommended work around? > FetchFile fails on filenames containing characters with orthographic marks > -- > > Key: NIFI-6051 > URL: https://issues.apache.org/jira/browse/NIFI-6051 > Project: Apache NiFi > Issue Type: Bug > Components: Extensions >Affects Versions: 1.8.0 >Reporter: Andy LoPresto >Assignee: Juan Carlos de la Prada de Haro >Priority: Major > Labels: encoding, file, utf-8 > Attachments: Sin título.png > > > As reported in the Slack channel: > {quote} > Hi all! I'm trying to use FetchFile processor to read some files but it's not > possible when directories includes special characters. I tried to solve that > issue modifying the character set to UTF8. To do so I add > "java.arg.8=-Dfile.encoding=UTF8" argument in the boostrap.conf file but it > doesn't work properly. > {quote} > The processor (and any other that interacts with the file system) should be > fixed to properly handle filenames such as: > * {{información.txt}} > * {{líneas.csv}} > * {{tack_så_mycket.json}} > * {{słowo.xml}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400205341 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!"); +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; Review comment: I think that topic is way more general than this PR. I would be happy to a see a doc about the interfaces we consider public API, but that doesn't exist yet. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400204455 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -0,0 +1,158 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "../TestBase.h" +#include "utils/MinifiConcurrentQueue.h" +#include "utils/StringUtils.h" + +using namespace org::apache::nifi::minifi::utils; + +TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { + utils::ConcurrentQueue queue; + std::vector results; + + std::thread producer([&queue]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); Review comment: Done, added a new testcases that deals with a lot of elements without sleeps. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400203410 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + + +// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers. +template +class ConcurrentQueue { + public: + explicit ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeueImpl(lck, out); + } + + bool empty() const { +std::unique_lock lck(mtx_); +return queue_.emptyImpl(lck); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + void checkLock(std::unique_lock& lck) const { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected functions of ConcurrentQueue should own the lock!"); +} + } + + bool tryDequeueImpl(std::unique_lock& lck, T& out) { +checkLock(lck); +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + + bool emptyImpl(std::unique_lock& lck) const { +checkLock(lck); +return queue_.empty(); + } + + mutable std::mutex mtx_; + private: + std::deque queue_; +}; + + +// A ConcurrentQueue extended with a condition variable to be able to block and wait for incoming data Review comment: Added This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400203336 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + + +// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers. +template +class ConcurrentQueue { + public: + explicit ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeueImpl(lck, out); + } + + bool empty() const { +std::unique_lock lck(mtx_); +return queue_.emptyImpl(lck); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + void checkLock(std::unique_lock& lck) const { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected functions of ConcurrentQueue should own the lock!"); +} + } + + bool tryDequeueImpl(std::unique_lock& lck, T& out) { +checkLock(lck); +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + + bool emptyImpl(std::unique_lock& lck) const { +checkLock(lck); +return queue_.empty(); + } + + mutable std::mutex mtx_; + private: + std::deque queue_; +}; + + +// A ConcurrentQueue extended with a condition variable to be able to block and wait for incoming data +template +class ConditionConcurrentQueue : private ConcurrentQueue { + public: + explicit ConditionConcurrentQueue(bool start = true) : ConcurrentQueue{}, running_{start} {} + + ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete; + ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete; + + using ConcurrentQueue::size; + using ConcurrentQueue::empty; + using ConcurrentQueue::clear; + + + template + void enqueue(Args&&... args) { +ConcurrentQueue::enqueue(std::forward(args)...); +if (running_) { + cv_.notify_one(); +} + } + + bool dequeueWait(T& out) { +std::unique_lock lck(this->mtx_); +cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped +return running_ && ConcurrentQueue::tryDequeueImpl(lck, out); + } + + template< class Rep, class Period > + bool dequeueWaitFor(T& out, const std::chrono::duration& time) { +std::unique_lock lck(this->mtx_); +cv_.wait_for(lck, time, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Wake up with timeout or in case there is something to do +return running_ && ConcurrentQueue::tryDequeueImpl(lck, out); + } + + bool tryDequeue(T& out) { +std::unique_lock lck(this->mtx_); +return running_ && ConcurrentQ
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400203526 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -29,11 +29,12 @@ namespace nifi { namespace minifi { namespace utils { + +// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers. Review comment: Added This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (NIFI-7296) BST TimeZone parsing fails, breaking webgui and API
Michael Percival created NIFI-7296: -- Summary: BST TimeZone parsing fails, breaking webgui and API Key: NIFI-7296 URL: https://issues.apache.org/jira/browse/NIFI-7296 Project: Apache NiFi Issue Type: Bug Components: Core UI Affects Versions: 1.11.3 Environment: Nifi 1.11.3 running on jre-11-openjdk-11.0.4.11-1.el7_7.x86_64 and RHEL 8 cluster of 6 servers Reporter: Michael Percival Since clocks have changed in the UK and we have moved to BST, API calls and browsing to the web gui fails with a 'An unexpected error has occurred. Please check the logs for additional details.' error. reviewing the nifi-user.log shows the below when attempting to access the webgui, appears the timezone is not being parsed properly by the web server, see below: Caused by: java.time.format.DateTimeParseException: Text '12:23:17 BST' could not be parsed: null at java.base/java.time.format.DateTimeFormatter.createError(DateTimeFormatter.java:2017) at java.base/java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1952) at java.base/java.time.LocalDateTime.parse(LocalDateTime.java:492) at org.apache.nifi.web.api.dto.util.TimeAdapter.unmarshal(TimeAdapter.java:55) at org.apache.nifi.web.api.dto.util.TimeAdapter.unmarshal(TimeAdapter.java:33) at com.fasterxml.jackson.module.jaxb.AdapterConverter.convert(AdapterConverter.java:35) at com.fasterxml.jackson.databind.deser.std.StdDelegatingDeserializer.convertValue(StdDelegatingDeserializ$ at com.fasterxml.jackson.databind.deser.std.StdDelegatingDeserializer.deserialize(StdDelegatingDeserialize$ at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:129) at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:288) ... 122 common frames omitted -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [nifi] si-sun edited a comment on issue #3917: Adding GetSmbFile and PutSmbFile processors
si-sun edited a comment on issue #3917: Adding GetSmbFile and PutSmbFile processors URL: https://github.com/apache/nifi/pull/3917#issuecomment-605107198 alright, I'll make a new one, next week edit: see #4169 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi] si-sun opened a new pull request #4169: Adding GetSmbFile and PutSmbFile processors #2
si-sun opened a new pull request #4169: Adding GetSmbFile and PutSmbFile processors #2 URL: https://github.com/apache/nifi/pull/4169 Thank you for submitting a contribution to Apache NiFi. Please provide a short description of the PR here: Description of PR Added functionality to read and write from samba shares with GetSmbFile and PutSmbFile processors. The advantage is to not have to manage cifs mounts on the server/nodes. Also the processors supports windows style share locks on the files during read and write. It's using [SMBJ client library](https://github.com/hierynomus/smbj) for the samba connection. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with **NIFI-** where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [X] Has your PR been rebased against the latest commit within the target branch (typically `master`)? - [X] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._ ### For code changes: - [X] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder? - [X] Have you written or updated unit tests to verify your changes? - [ ] Have you verified that the full build is successful on both JDK 8 and JDK 11? - [X] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [X] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`? - [X] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`? - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [X] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400061549 ## File path: libminifi/src/utils/ThreadPool.cpp ## @@ -64,8 +64,10 @@ void ThreadPool::run_tasks(std::shared_ptr thread) { } } } else { - std::unique_lock lock(worker_queue_mutex_); - tasks_available_.wait(lock); + // This means that the threadpool is running, but the ConcurrentQueue is stopped -> shouldn't happen + if (running_.load()) { +std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } Review comment: thx for the explanation. Feel free to close the thread. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400043152 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!"); +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; Review comment: It's fine and expected that we use it in libminifi. What I'd like is that we only support dependency on public members for libminifi API users and treat `protected`/`private` as "internal". The language doesn't provide tools for that (like package-private in Java), so that's an API documentation issue. We could also use pimpl, but that would mean a lot of boilerplate, an extra level of indirection and no inlining, so I'm against that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400060410 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -0,0 +1,158 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "../TestBase.h" +#include "utils/MinifiConcurrentQueue.h" +#include "utils/StringUtils.h" + +using namespace org::apache::nifi::minifi::utils; + +TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { + utils::ConcurrentQueue queue; + std::vector results; + + std::thread producer([&queue]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); Review comment: I see your point, but we should cover data races as well. Can we have at least a handful of test cases with rapid insertions, i.e. with no sleep, to cover both problems? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400045447 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -29,11 +29,12 @@ namespace nifi { namespace minifi { namespace utils { + +// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers. Review comment: I would add: - Guarantees that the elements are dequeued in the order of insertion This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400051711 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -119,10 +133,22 @@ class ConditionConcurrentQueue : private ConcurrentQueue { } } - bool dequeue(T& out) { + bool dequeueWait(T& out) { +std::unique_lock lck(this->mtx_); +cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped +return running_ && ConcurrentQueue::tryDequeueImpl(lck, out); + } + + template< class Rep, class Period > + bool dequeueWaitFor(T& out, const std::chrono::duration& time) { Review comment: I like that you used a template here, not just a concrete duration type like `milliseconds`, which is more common in the code base. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400056195 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + + +// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers. +template +class ConcurrentQueue { + public: + explicit ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeueImpl(lck, out); + } + + bool empty() const { +std::unique_lock lck(mtx_); +return queue_.emptyImpl(lck); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + void checkLock(std::unique_lock& lck) const { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected functions of ConcurrentQueue should own the lock!"); +} + } + + bool tryDequeueImpl(std::unique_lock& lck, T& out) { +checkLock(lck); +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + + bool emptyImpl(std::unique_lock& lck) const { +checkLock(lck); +return queue_.empty(); + } + + mutable std::mutex mtx_; + private: + std::deque queue_; +}; + + +// A ConcurrentQueue extended with a condition variable to be able to block and wait for incoming data Review comment: I would like to add: - `stop` interrupts all consumers without a chance to consume remaining elements in the queue - started means queue elements can be consumed/dequeued. - It's possible to enqueue elements regardless of the running state. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400043883 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -29,11 +29,12 @@ namespace nifi { namespace minifi { namespace utils { + +// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers. template class ConcurrentQueue { public: - ConcurrentQueue() = default; - virtual ~ConcurrentQueue() = default; + explicit ConcurrentQueue() = default; Review comment: I don't see the need to make this explicit, although it doesn't hurt much. It prevents code like this: `ConcurrentQueue make_queue() { return {}; }` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400048134 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -81,25 +82,38 @@ class ConcurrentQueue { : queue_( std::move(other.queue_) ) {} protected: - bool tryDequeue(std::unique_lock& lck, T& out) { + void checkLock(std::unique_lock& lck) const { if (!lck.owns_lock()) { - throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!"); + throw std::logic_error("Caller of protected functions of ConcurrentQueue should own the lock!"); } + } + + bool tryDequeueImpl(std::unique_lock& lck, T& out) { +checkLock(lck); if (queue_.empty()) { return false; } out = std::move(queue_.front()); queue_.pop_front(); return true; } Review comment: I like that you used protected functions taking `std::unique_lock` to make it an error to use the non-locking functions without a lock. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
szaszm commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400057782 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + + +// Provides a queue API and guarantees no race conditions in case of multiple producers and consumers. +template +class ConcurrentQueue { + public: + explicit ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeueImpl(lck, out); + } + + bool empty() const { +std::unique_lock lck(mtx_); +return queue_.emptyImpl(lck); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + void checkLock(std::unique_lock& lck) const { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected functions of ConcurrentQueue should own the lock!"); +} + } + + bool tryDequeueImpl(std::unique_lock& lck, T& out) { +checkLock(lck); +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + + bool emptyImpl(std::unique_lock& lck) const { +checkLock(lck); +return queue_.empty(); + } + + mutable std::mutex mtx_; + private: + std::deque queue_; +}; + + +// A ConcurrentQueue extended with a condition variable to be able to block and wait for incoming data +template +class ConditionConcurrentQueue : private ConcurrentQueue { + public: + explicit ConditionConcurrentQueue(bool start = true) : ConcurrentQueue{}, running_{start} {} + + ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete; + ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete; + + using ConcurrentQueue::size; + using ConcurrentQueue::empty; + using ConcurrentQueue::clear; + + + template + void enqueue(Args&&... args) { +ConcurrentQueue::enqueue(std::forward(args)...); +if (running_) { + cv_.notify_one(); +} + } + + bool dequeueWait(T& out) { +std::unique_lock lck(this->mtx_); +cv_.wait(lck, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Only wake up if there is something to return or stopped +return running_ && ConcurrentQueue::tryDequeueImpl(lck, out); + } + + template< class Rep, class Period > + bool dequeueWaitFor(T& out, const std::chrono::duration& time) { +std::unique_lock lck(this->mtx_); +cv_.wait_for(lck, time, [this, &lck]{ return !running_ || !this->emptyImpl(lck); }); // Wake up with timeout or in case there is something to do +return running_ && ConcurrentQueue::tryDequeueImpl(lck, out); + } + + bool tryDequeue(T& out) { +std::unique_lock lck(this->mtx_); +return running_ && ConcurrentQueu
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400041324 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -0,0 +1,158 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "../TestBase.h" +#include "utils/MinifiConcurrentQueue.h" +#include "utils/StringUtils.h" + +using namespace org::apache::nifi::minifi::utils; + +TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { + utils::ConcurrentQueue queue; + std::vector results; + + std::thread producer([&queue]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); Review comment: In case there would be no sleeps here, producer most probably just inserts everything before the consumer is even started. Which would also make the last testcases useless, where the goal is to prove that elements inserted later (when some are already looped in the queue) also get to a consumer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400039598 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -0,0 +1,158 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "../TestBase.h" +#include "utils/MinifiConcurrentQueue.h" +#include "utils/StringUtils.h" + +using namespace org::apache::nifi::minifi::utils; Review comment: Did the 2nd Btw I don't think using directives are any bad in test cpp files. In headers that get included to production code I would definitely avoid them as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400040217 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -0,0 +1,158 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "../TestBase.h" +#include "utils/MinifiConcurrentQueue.h" +#include "utils/StringUtils.h" + +using namespace org::apache::nifi::minifi::utils; + +TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { + utils::ConcurrentQueue queue; + std::vector results; + + std::thread producer([&queue]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); +}); + + std::thread consumer([&queue, &results]() { + while (results.size() < 3) { + std::string s; + if (queue.tryDequeue(s)) { + results.push_back(s); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } +}); + + producer.join(); + consumer.join(); + + REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); +} + + +TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") { + utils::ConditionConcurrentQueue queue(true); + std::vector results; + + std::thread producer([&queue]() { +queue.enqueue("ba"); +std::this_thread::sleep_for(std::chrono::milliseconds(3)); +queue.enqueue("dum"); +std::this_thread::sleep_for(std::chrono::milliseconds(3)); +queue.enqueue("tss"); + }); + + std::thread consumer([&queue, &results]() { +std::string s; +while (queue.dequeue(s)) { + results.push_back(s); +} + }); + + producer.join(); + + queue.stop(); + + consumer.join(); + + REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); +} + + +/* In this testcase the consumer thread puts back all items to the queue to consume again + * Even in this case the ones inserted later by the producer should be consumed */ +TEST_CASE("TestConqurrentQueue::testQueueWithReAdd", "[TestQueueWithReAdd]") { + utils::ConcurrentQueue queue; + std::set results; + + std::thread producer([&queue]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); +}); Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400039479 ## File path: libminifi/src/utils/ThreadPool.cpp ## @@ -64,8 +64,10 @@ void ThreadPool::run_tasks(std::shared_ptr thread) { } } } else { - std::unique_lock lock(worker_queue_mutex_); - tasks_available_.wait(lock); + // This means that the threadpool is running, but the ConcurrentQueue is stopped -> shouldn't happen + if (running_.load()) { +std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } Review comment: There is no logger here. It's not a critical error that worth an exception/dump and it might happen during startup/shutdown for a very short time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400039598 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -0,0 +1,158 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "../TestBase.h" +#include "utils/MinifiConcurrentQueue.h" +#include "utils/StringUtils.h" + +using namespace org::apache::nifi::minifi::utils; Review comment: Did the 2nd This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400038906 ## File path: libminifi/test/unit/MinifiConcurrentQueueTests.cpp ## @@ -0,0 +1,158 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include "../TestBase.h" +#include "utils/MinifiConcurrentQueue.h" +#include "utils/StringUtils.h" + +using namespace org::apache::nifi::minifi::utils; + +TEST_CASE("TestConqurrentQueue::testQueue", "[TestQueue]") { + utils::ConcurrentQueue queue; + std::vector results; + + std::thread producer([&queue]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); +}); + + std::thread consumer([&queue, &results]() { + while (results.size() < 3) { + std::string s; + if (queue.tryDequeue(s)) { + results.push_back(s); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } +}); + + producer.join(); + consumer.join(); + + REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); +} + + +TEST_CASE("TestConditionConqurrentQueue::testQueue", "[TestConditionQueue]") { + utils::ConditionConcurrentQueue queue(true); + std::vector results; + + std::thread producer([&queue]() { +queue.enqueue("ba"); +std::this_thread::sleep_for(std::chrono::milliseconds(3)); +queue.enqueue("dum"); +std::this_thread::sleep_for(std::chrono::milliseconds(3)); +queue.enqueue("tss"); + }); + + std::thread consumer([&queue, &results]() { +std::string s; +while (queue.dequeue(s)) { + results.push_back(s); +} + }); + + producer.join(); + + queue.stop(); + + consumer.join(); + + REQUIRE(utils::StringUtils::join("-", results) == "ba-dum-tss"); +} + + +/* In this testcase the consumer thread puts back all items to the queue to consume again + * Even in this case the ones inserted later by the producer should be consumed */ +TEST_CASE("TestConqurrentQueue::testQueueWithReAdd", "[TestQueueWithReAdd]") { + utils::ConcurrentQueue queue; + std::set results; + + std::thread producer([&queue]() { + queue.enqueue("ba"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("dum"); + std::this_thread::sleep_for(std::chrono::milliseconds(3)); + queue.enqueue("tss"); +}); + + std::thread consumer([&queue, &results]() { +while (results.size() < 3) { + std::string s; + if (queue.tryDequeue(s)) { +results.insert(s); +queue.enqueue(std::move(s)); + } else { +std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } +} + }); + + producer.join(); + + // Give some time for the consumer to loop over the queue + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + consumer.join(); Review comment: Good point, removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400038743 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!"); +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; +}; + +template +class ConditionConcurrentQueue : private ConcurrentQueue { Review comment: Added comments to explain This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400038581 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!"); +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; +}; + +template +class ConditionConcurrentQueue : private ConcurrentQueue { + public: + ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), running_{start} {}; + + ConditionConcurrentQueue(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue& operator=(const ConditionConcurrentQueue& other) = delete; + ConditionConcurrentQueue(ConditionConcurrentQueue&& other) = delete; + ConditionConcurrentQueue& operator=(ConditionConcurrentQueue&& other) = delete; + + using ConcurrentQueue::size; + using ConcurrentQueue::empty; + using ConcurrentQueue::clear; + + + template + void enqueue(Args&&... args) { +ConcurrentQueue::enqueue(std::forward(args)...); +if (running_) { + cv_.notify_one(); +} + } + + bool dequeue(T& out) { Review comment: Added wait_for, so now it support: - tryDequeue - DequeueWait - DequeueWaitFor This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400038135 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!"); +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; Review comment: The container is now private, the mutex isn't as you have to acquire that in derived classes as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400037700 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; + + ConcurrentQueue(const ConcurrentQueue& other) = delete; + ConcurrentQueue& operator=(const ConcurrentQueue& other) = delete; + ConcurrentQueue(ConcurrentQueue&& other) +: ConcurrentQueue(std::move(other), std::lock_guard(other.mutex_)) {} + + ConcurrentQueue& operator=(ConcurrentQueue&& other) { +if (this != &other) { + std::lock(mtx_, other.mtx_); + std::lock_guard lk1(mtx_, std::adopt_lock); + std::lock_guard lk2(other.mtx_, std::adopt_lock); + queue_.swap(other.queue_); +} +return *this; + } + + bool tryDequeue(T& out) { +std::unique_lock lck(mtx_); +return tryDequeue(lck, out); + } + + bool empty() const { +std::lock_guard guard(mtx_); +return queue_.empty(); + } + + size_t size() const { +std::lock_guard guard(mtx_); +return queue_.size(); + } + + void clear() { +std::lock_guard guard(mtx_); +queue_.clear(); + } + + template + void enqueue(Args&&... args) { +std::lock_guard guard(mtx_); +queue_.emplace_back(std::forward(args)...); + } + + private: + ConcurrentQueue(ConcurrentQueue&& other, std::lock_guard&) +: queue_( std::move(other.queue_) ) {} + + protected: + bool tryDequeue(std::unique_lock& lck, T& out) { +if (!lck.owns_lock()) { + throw std::logic_error("Caller of protected ConcurrentQueue::tryDequeue should own the lock!"); +} +if (queue_.empty()) { + return false; +} +out = std::move(queue_.front()); +queue_.pop_front(); +return true; + } + std::deque queue_; + mutable std::mutex mtx_; +}; + +template +class ConditionConcurrentQueue : private ConcurrentQueue { + public: + ConditionConcurrentQueue(bool start = false) : ConcurrentQueue(), running_{start} {}; Review comment: Fixed them This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400037604 ## File path: libminifi/include/utils/MinifiConcurrentQueue.h ## @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H +#define LIBMINIFI_INCLUDE_CONCURRENT_QUEUE_H + +#include +#include +#include +#include +#include + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace utils { + +template +class ConcurrentQueue { + public: + ConcurrentQueue() = default; + virtual ~ConcurrentQueue() = default; Review comment: Removed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [nifi-minifi-cpp] arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool
arpadboda commented on a change in pull request #746: MINIFICPP-1185 - Remove moodycamel::concurrentqueue from threadpool URL: https://github.com/apache/nifi-minifi-cpp/pull/746#discussion_r400037378 ## File path: libminifi/include/utils/ThreadPool.h ## @@ -330,20 +331,18 @@ class ThreadPool { // integrated power manager std::shared_ptr thread_manager_; // thread queue for the recently deceased threads. - moodycamel::ConcurrentQueue> deceased_thread_queue_; + ConcurrentQueue> deceased_thread_queue_; // worker queue of worker objects - moodycamel::ConcurrentQueue> worker_queue_; + ConditionConcurrentQueue> worker_queue_; std::priority_queue, std::vector>, DelayedTaskComparator> delayed_worker_queue_; -// notification for available work - std::condition_variable tasks_available_; +// mutex to protect task status and delayed queue + std::mutex worker_queue_mutex_; Review comment: Removed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services