[jira] [Updated] (NIFI-5755) Allow PutParquet prosessor to dynamically set config properties for parquet lib
[ https://issues.apache.org/jira/browse/NIFI-5755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ken Tore Tallakstad updated NIFI-5755: -- Description: PutParquet (NiFi 1.7.1), will fail to write parquet if incoming records contain arrays with null elements: Schema snippet example: { "name": "myfield", "type" : ["null", \\{ "type" : "array", "items" : ["null","string"], "default": null } ], "default": null }, And a corresponding data example: "myfield" : [ null, "value1" ], "myfield" : [ null, "value2" ], "myfield" : [ "value3", null, "value4" ], Avro does not seem to have a problem with this, but Put parquet fails with the following error: "Array contains a null element at X". There is a parquet config to allow this: parquet.avro.write-old-list-structure=false If the processor would have a box for toggling this value, or a text box to pass configs in general would be great! was: PutParquet (NiFi 1.7.1), will fail to write parquet if incoming records contain arrays with null elements: Schema snippet example: { "name": "myfield", "type" : ["null", \{ "type" : "array", "items" : ["null","string"], "default": null } ], "default": null }, And a corresponding data example: "myfield" : [ null, "value1" ], "myfield" : [ null, "value2" ], "myfield" : [ "value3", null, "value4" ], Avro does not seem to have a problem with this, but Put parquet fails with the following error: "Array contains a null element at X". There is a parquet config to allow this: parquet.avro.write-old-list-structure=false If the processor would have a box for toggling this value, or a text box to pass configs in general would be great! !parq2.png! > Allow PutParquet prosessor to dynamically set config properties for parquet > lib > --- > > Key: NIFI-5755 > URL: https://issues.apache.org/jira/browse/NIFI-5755 > Project: Apache NiFi > Issue Type: New Feature > Components: Core Framework >Affects Versions: 1.7.1 >Reporter: Ken Tore Tallakstad >Priority: Major > Attachments: parq2.png > > > PutParquet (NiFi 1.7.1), will fail to write parquet if incoming records > contain arrays with null elements: > Schema snippet example: > { "name": "myfield", "type" : ["null", \\{ "type" : > "array", "items" : ["null","string"], "default": null } > ], "default": null > }, > > And a corresponding data example: > "myfield" : [ null, "value1" ], > "myfield" : [ null, "value2" ], > "myfield" : [ "value3", null, "value4" ], > > Avro does not seem to have a problem with this, but Put parquet fails with > the following error: "Array contains a null element at X". > > There is a parquet config to allow this: > parquet.avro.write-old-list-structure=false > > If the processor would have a box for toggling this value, or a text box to > pass configs in general would be great! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (NIFI-5755) Allow PutParquet prosessor to dynamically set config properties for parquet lib
[ https://issues.apache.org/jira/browse/NIFI-5755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ken Tore Tallakstad updated NIFI-5755: -- Attachment: parq2.png > Allow PutParquet prosessor to dynamically set config properties for parquet > lib > --- > > Key: NIFI-5755 > URL: https://issues.apache.org/jira/browse/NIFI-5755 > Project: Apache NiFi > Issue Type: New Feature > Components: Core Framework >Affects Versions: 1.7.1 >Reporter: Ken Tore Tallakstad >Priority: Major > Attachments: parq2.png > > > PutParquet (NiFi 1.7.1), will fail to write parquet if incoming records > contain arrays with null elements: > Schema snippet example: > { "name": "myfield", "type" : ["null", \\{ "type" : > "array", "items" : ["null","string"], "default": null } > ], "default": null > }, > > And a corresponding data example: > "myfield" : [ null, "value1" ], > "myfield" : [ null, "value2" ], > "myfield" : [ "value3", null, "value4" ], > > Avro does not seem to have a problem with this, but Put parquet fails with > the following error: "Array contains a null element at X". > > There is a parquet config to allow this: > parquet.avro.write-old-list-structure=false > > If the processor would have a box for toggling this value, or a text box to > pass configs in general would be great! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (NIFI-5755) Allow PutParquet prosessor to dynamically set config properties for parquet lib
Ken Tore Tallakstad created NIFI-5755: - Summary: Allow PutParquet prosessor to dynamically set config properties for parquet lib Key: NIFI-5755 URL: https://issues.apache.org/jira/browse/NIFI-5755 Project: Apache NiFi Issue Type: New Feature Components: Core Framework Affects Versions: 1.7.1 Reporter: Ken Tore Tallakstad PutParquet (NiFi 1.7.1), will fail to write parquet if incoming records contain arrays with null elements: Schema snippet example: { "name": "myfield", "type" : ["null", \{ "type" : "array", "items" : ["null","string"], "default": null } ], "default": null }, And a corresponding data example: "myfield" : [ null, "value1" ], "myfield" : [ null, "value2" ], "myfield" : [ "value3", null, "value4" ], Avro does not seem to have a problem with this, but Put parquet fails with the following error: "Array contains a null element at X". There is a parquet config to allow this: parquet.avro.write-old-list-structure=false If the processor would have a box for toggling this value, or a text box to pass configs in general would be great! !parq2.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5752) Load balancing fails with wildcard certs
[ https://issues.apache.org/jira/browse/NIFI-5752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664544#comment-16664544 ] ASF GitHub Bot commented on NIFI-5752: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/3110#discussion_r228385558 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java --- @@ -40,28 +42,23 @@ public ClusterLoadBalanceAuthorizer(final ClusterCoordinator clusterCoordinator, } @Override -public void authorize(final Collection clientIdentities) throws NotAuthorizedException { -if (clientIdentities == null) { -logger.debug("Client Identities is null, so assuming that Load Balancing communications are not secure. Authorizing client to participate in Load Balancing"); -return; -} - -final Set nodeIds = clusterCoordinator.getNodeIdentifiers().stream() +public void authorize(final SSLSession sslSession) throws NotAuthorizedException { +final List nodeIds = clusterCoordinator.getNodeIdentifiers().stream() .map(NodeIdentifier::getApiAddress) -.collect(Collectors.toSet()); +.collect(Collectors.toList()); -for (final String clientId : clientIdentities) { -if (nodeIds.contains(clientId)) { -logger.debug("Client ID '{}' is in the list of Nodes in the Cluster. Authorizing Client to Load Balance data", clientId); +for (final String nodeId : nodeIds) { +final HostnameVerifier verifier = new DefaultHostnameVerifier(); --- End diff -- I think HostnameVerifier is thread-safe and can be an instance field instead of creating at each verification. > Load balancing fails with wildcard certs > > > Key: NIFI-5752 > URL: https://issues.apache.org/jira/browse/NIFI-5752 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.8.0 >Reporter: Kotaro Terada >Priority: Major > > Load balancing fails when we construct a secure cluster with wildcard certs. > For example, assume that we have a valid wildcard cert for {{*.example.com}} > and a cluster consists of {{nf1.example.com}}, {{nf2.example.com}}, and > {{nf3.example.com}} . We cannot transfer a FlowFile between nodes for load > balancing because of the following authorization error: > {noformat} > 2018-10-25 19:05:13,520 WARN [Load Balance Server Thread-2] > o.a.n.c.q.c.s.ClusterLoadBalanceAuthorizer Authorization failed for Client > ID's [*.example.com] to Load Balance data because none of the ID's are known > Cluster Node Identifiers > 2018-10-25 19:05:13,521 ERROR [Load Balance Server Thread-2] > o.a.n.c.q.c.s.ConnectionLoadBalanceServer Failed to communicate with Peer > /xxx.xxx.xxx.xxx:x > org.apache.nifi.controller.queue.clustered.server.NotAuthorizedException: > Client ID's [*.example.com] are not authorized to Load Balance data > at > org.apache.nifi.controller.queue.clustered.server.ClusterLoadBalanceAuthorizer.authorize(ClusterLoadBalanceAuthorizer.java:65) > at > org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol.receiveFlowFiles(StandardLoadBalanceProtocol.java:142) > at > org.apache.nifi.controller.queue.clustered.server.ConnectionLoadBalanceServer$CommunicateAction.run(ConnectionLoadBalanceServer.java:176) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > This problem occurs because in {{authorize}} method in > {{ClusterLoadBalanceAuthorizer}} class, authorization is tried by just > matching strings. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5752) Load balancing fails with wildcard certs
[ https://issues.apache.org/jira/browse/NIFI-5752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664546#comment-16664546 ] ASF GitHub Bot commented on NIFI-5752: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/3110#discussion_r228386235 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java --- @@ -40,28 +42,23 @@ public ClusterLoadBalanceAuthorizer(final ClusterCoordinator clusterCoordinator, } @Override -public void authorize(final Collection clientIdentities) throws NotAuthorizedException { -if (clientIdentities == null) { -logger.debug("Client Identities is null, so assuming that Load Balancing communications are not secure. Authorizing client to participate in Load Balancing"); -return; -} - -final Set nodeIds = clusterCoordinator.getNodeIdentifiers().stream() +public void authorize(final SSLSession sslSession) throws NotAuthorizedException { +final List nodeIds = clusterCoordinator.getNodeIdentifiers().stream() .map(NodeIdentifier::getApiAddress) -.collect(Collectors.toSet()); +.collect(Collectors.toList()); -for (final String clientId : clientIdentities) { -if (nodeIds.contains(clientId)) { -logger.debug("Client ID '{}' is in the list of Nodes in the Cluster. Authorizing Client to Load Balance data", clientId); +for (final String nodeId : nodeIds) { +final HostnameVerifier verifier = new DefaultHostnameVerifier(); +if (verifier.verify(nodeId, sslSession)) { +logger.debug("Authorizing Client to Load Balance data"); --- End diff -- In a case where the cert contains exact nodeId, the `nodeId` is still informative to be logged. I'd suggest logging message something like: ```suggestion logger.debug("The request was verified with node ID '{}'. Authorizing Client to Load Balance data", nodeId); ``` > Load balancing fails with wildcard certs > > > Key: NIFI-5752 > URL: https://issues.apache.org/jira/browse/NIFI-5752 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.8.0 >Reporter: Kotaro Terada >Priority: Major > > Load balancing fails when we construct a secure cluster with wildcard certs. > For example, assume that we have a valid wildcard cert for {{*.example.com}} > and a cluster consists of {{nf1.example.com}}, {{nf2.example.com}}, and > {{nf3.example.com}} . We cannot transfer a FlowFile between nodes for load > balancing because of the following authorization error: > {noformat} > 2018-10-25 19:05:13,520 WARN [Load Balance Server Thread-2] > o.a.n.c.q.c.s.ClusterLoadBalanceAuthorizer Authorization failed for Client > ID's [*.example.com] to Load Balance data because none of the ID's are known > Cluster Node Identifiers > 2018-10-25 19:05:13,521 ERROR [Load Balance Server Thread-2] > o.a.n.c.q.c.s.ConnectionLoadBalanceServer Failed to communicate with Peer > /xxx.xxx.xxx.xxx:x > org.apache.nifi.controller.queue.clustered.server.NotAuthorizedException: > Client ID's [*.example.com] are not authorized to Load Balance data > at > org.apache.nifi.controller.queue.clustered.server.ClusterLoadBalanceAuthorizer.authorize(ClusterLoadBalanceAuthorizer.java:65) > at > org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol.receiveFlowFiles(StandardLoadBalanceProtocol.java:142) > at > org.apache.nifi.controller.queue.clustered.server.ConnectionLoadBalanceServer$CommunicateAction.run(ConnectionLoadBalanceServer.java:176) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > This problem occurs because in {{authorize}} method in > {{ClusterLoadBalanceAuthorizer}} class, authorization is tried by just > matching strings. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5752) Load balancing fails with wildcard certs
[ https://issues.apache.org/jira/browse/NIFI-5752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664543#comment-16664543 ] ASF GitHub Bot commented on NIFI-5752: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/3110#discussion_r228384110 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java --- @@ -40,28 +42,23 @@ public ClusterLoadBalanceAuthorizer(final ClusterCoordinator clusterCoordinator, } @Override -public void authorize(final Collection clientIdentities) throws NotAuthorizedException { -if (clientIdentities == null) { -logger.debug("Client Identities is null, so assuming that Load Balancing communications are not secure. Authorizing client to participate in Load Balancing"); -return; -} - -final Set nodeIds = clusterCoordinator.getNodeIdentifiers().stream() +public void authorize(final SSLSession sslSession) throws NotAuthorizedException { +final List nodeIds = clusterCoordinator.getNodeIdentifiers().stream() .map(NodeIdentifier::getApiAddress) -.collect(Collectors.toSet()); +.collect(Collectors.toList()); --- End diff -- Is there any reason to use `toList` instead? > Load balancing fails with wildcard certs > > > Key: NIFI-5752 > URL: https://issues.apache.org/jira/browse/NIFI-5752 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.8.0 >Reporter: Kotaro Terada >Priority: Major > > Load balancing fails when we construct a secure cluster with wildcard certs. > For example, assume that we have a valid wildcard cert for {{*.example.com}} > and a cluster consists of {{nf1.example.com}}, {{nf2.example.com}}, and > {{nf3.example.com}} . We cannot transfer a FlowFile between nodes for load > balancing because of the following authorization error: > {noformat} > 2018-10-25 19:05:13,520 WARN [Load Balance Server Thread-2] > o.a.n.c.q.c.s.ClusterLoadBalanceAuthorizer Authorization failed for Client > ID's [*.example.com] to Load Balance data because none of the ID's are known > Cluster Node Identifiers > 2018-10-25 19:05:13,521 ERROR [Load Balance Server Thread-2] > o.a.n.c.q.c.s.ConnectionLoadBalanceServer Failed to communicate with Peer > /xxx.xxx.xxx.xxx:x > org.apache.nifi.controller.queue.clustered.server.NotAuthorizedException: > Client ID's [*.example.com] are not authorized to Load Balance data > at > org.apache.nifi.controller.queue.clustered.server.ClusterLoadBalanceAuthorizer.authorize(ClusterLoadBalanceAuthorizer.java:65) > at > org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol.receiveFlowFiles(StandardLoadBalanceProtocol.java:142) > at > org.apache.nifi.controller.queue.clustered.server.ConnectionLoadBalanceServer$CommunicateAction.run(ConnectionLoadBalanceServer.java:176) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > This problem occurs because in {{authorize}} method in > {{ClusterLoadBalanceAuthorizer}} class, authorization is tried by just > matching strings. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #3110: NIFI-5752: Load balancing fails with wildcard certs
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/3110#discussion_r228384110 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java --- @@ -40,28 +42,23 @@ public ClusterLoadBalanceAuthorizer(final ClusterCoordinator clusterCoordinator, } @Override -public void authorize(final Collection clientIdentities) throws NotAuthorizedException { -if (clientIdentities == null) { -logger.debug("Client Identities is null, so assuming that Load Balancing communications are not secure. Authorizing client to participate in Load Balancing"); -return; -} - -final Set nodeIds = clusterCoordinator.getNodeIdentifiers().stream() +public void authorize(final SSLSession sslSession) throws NotAuthorizedException { +final List nodeIds = clusterCoordinator.getNodeIdentifiers().stream() .map(NodeIdentifier::getApiAddress) -.collect(Collectors.toSet()); +.collect(Collectors.toList()); --- End diff -- Is there any reason to use `toList` instead? ---
[jira] [Commented] (NIFI-5752) Load balancing fails with wildcard certs
[ https://issues.apache.org/jira/browse/NIFI-5752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664545#comment-16664545 ] ASF GitHub Bot commented on NIFI-5752: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/3110#discussion_r228387841 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java --- @@ -40,28 +42,23 @@ public ClusterLoadBalanceAuthorizer(final ClusterCoordinator clusterCoordinator, } @Override -public void authorize(final Collection clientIdentities) throws NotAuthorizedException { -if (clientIdentities == null) { -logger.debug("Client Identities is null, so assuming that Load Balancing communications are not secure. Authorizing client to participate in Load Balancing"); -return; -} - -final Set nodeIds = clusterCoordinator.getNodeIdentifiers().stream() +public void authorize(final SSLSession sslSession) throws NotAuthorizedException { +final List nodeIds = clusterCoordinator.getNodeIdentifiers().stream() .map(NodeIdentifier::getApiAddress) -.collect(Collectors.toSet()); +.collect(Collectors.toList()); -for (final String clientId : clientIdentities) { -if (nodeIds.contains(clientId)) { -logger.debug("Client ID '{}' is in the list of Nodes in the Cluster. Authorizing Client to Load Balance data", clientId); +for (final String nodeId : nodeIds) { +final HostnameVerifier verifier = new DefaultHostnameVerifier(); +if (verifier.verify(nodeId, sslSession)) { +logger.debug("Authorizing Client to Load Balance data"); return; --- End diff -- By #3109, we need to return the client peer description when authorization passes. For the best informative result for data provenance, we need to do: - If any SAN exists in the known nodeIds, then return the matched SAN (this can be done by the existing code), this way, we can identify which node sent the request at best. (If the cert contains multiple nodeIds as SAN, this logic can be broken, but I believe that is a corner-case that we don't need to support) - If none of SAN matches with any nodeId, then use hostname verifier to support wildcard cert. In this case, return hostname derived from the socket address Alternatively, we just need to use the hostname verifier and use the hostname derived from the socket address in any case for provenance data. How do you think @markap14 ? > Load balancing fails with wildcard certs > > > Key: NIFI-5752 > URL: https://issues.apache.org/jira/browse/NIFI-5752 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.8.0 >Reporter: Kotaro Terada >Priority: Major > > Load balancing fails when we construct a secure cluster with wildcard certs. > For example, assume that we have a valid wildcard cert for {{*.example.com}} > and a cluster consists of {{nf1.example.com}}, {{nf2.example.com}}, and > {{nf3.example.com}} . We cannot transfer a FlowFile between nodes for load > balancing because of the following authorization error: > {noformat} > 2018-10-25 19:05:13,520 WARN [Load Balance Server Thread-2] > o.a.n.c.q.c.s.ClusterLoadBalanceAuthorizer Authorization failed for Client > ID's [*.example.com] to Load Balance data because none of the ID's are known > Cluster Node Identifiers > 2018-10-25 19:05:13,521 ERROR [Load Balance Server Thread-2] > o.a.n.c.q.c.s.ConnectionLoadBalanceServer Failed to communicate with Peer > /xxx.xxx.xxx.xxx:x > org.apache.nifi.controller.queue.clustered.server.NotAuthorizedException: > Client ID's [*.example.com] are not authorized to Load Balance data > at > org.apache.nifi.controller.queue.clustered.server.ClusterLoadBalanceAuthorizer.authorize(ClusterLoadBalanceAuthorizer.java:65) > at > org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol.receiveFlowFiles(StandardLoadBalanceProtocol.java:142) > at > org.apache.nifi.controller.queue.clustered.server.ConnectionLoadBalanceServer$CommunicateAction.run(ConnectionLoadBalanceServer.java:176) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at >
[GitHub] nifi pull request #3110: NIFI-5752: Load balancing fails with wildcard certs
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/3110#discussion_r228385558 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java --- @@ -40,28 +42,23 @@ public ClusterLoadBalanceAuthorizer(final ClusterCoordinator clusterCoordinator, } @Override -public void authorize(final Collection clientIdentities) throws NotAuthorizedException { -if (clientIdentities == null) { -logger.debug("Client Identities is null, so assuming that Load Balancing communications are not secure. Authorizing client to participate in Load Balancing"); -return; -} - -final Set nodeIds = clusterCoordinator.getNodeIdentifiers().stream() +public void authorize(final SSLSession sslSession) throws NotAuthorizedException { +final List nodeIds = clusterCoordinator.getNodeIdentifiers().stream() .map(NodeIdentifier::getApiAddress) -.collect(Collectors.toSet()); +.collect(Collectors.toList()); -for (final String clientId : clientIdentities) { -if (nodeIds.contains(clientId)) { -logger.debug("Client ID '{}' is in the list of Nodes in the Cluster. Authorizing Client to Load Balance data", clientId); +for (final String nodeId : nodeIds) { +final HostnameVerifier verifier = new DefaultHostnameVerifier(); --- End diff -- I think HostnameVerifier is thread-safe and can be an instance field instead of creating at each verification. ---
[GitHub] nifi pull request #3110: NIFI-5752: Load balancing fails with wildcard certs
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/3110#discussion_r228387841 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java --- @@ -40,28 +42,23 @@ public ClusterLoadBalanceAuthorizer(final ClusterCoordinator clusterCoordinator, } @Override -public void authorize(final Collection clientIdentities) throws NotAuthorizedException { -if (clientIdentities == null) { -logger.debug("Client Identities is null, so assuming that Load Balancing communications are not secure. Authorizing client to participate in Load Balancing"); -return; -} - -final Set nodeIds = clusterCoordinator.getNodeIdentifiers().stream() +public void authorize(final SSLSession sslSession) throws NotAuthorizedException { +final List nodeIds = clusterCoordinator.getNodeIdentifiers().stream() .map(NodeIdentifier::getApiAddress) -.collect(Collectors.toSet()); +.collect(Collectors.toList()); -for (final String clientId : clientIdentities) { -if (nodeIds.contains(clientId)) { -logger.debug("Client ID '{}' is in the list of Nodes in the Cluster. Authorizing Client to Load Balance data", clientId); +for (final String nodeId : nodeIds) { +final HostnameVerifier verifier = new DefaultHostnameVerifier(); +if (verifier.verify(nodeId, sslSession)) { +logger.debug("Authorizing Client to Load Balance data"); return; --- End diff -- By #3109, we need to return the client peer description when authorization passes. For the best informative result for data provenance, we need to do: - If any SAN exists in the known nodeIds, then return the matched SAN (this can be done by the existing code), this way, we can identify which node sent the request at best. (If the cert contains multiple nodeIds as SAN, this logic can be broken, but I believe that is a corner-case that we don't need to support) - If none of SAN matches with any nodeId, then use hostname verifier to support wildcard cert. In this case, return hostname derived from the socket address Alternatively, we just need to use the hostname verifier and use the hostname derived from the socket address in any case for provenance data. How do you think @markap14 ? ---
[GitHub] nifi pull request #3110: NIFI-5752: Load balancing fails with wildcard certs
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/3110#discussion_r228386235 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java --- @@ -40,28 +42,23 @@ public ClusterLoadBalanceAuthorizer(final ClusterCoordinator clusterCoordinator, } @Override -public void authorize(final Collection clientIdentities) throws NotAuthorizedException { -if (clientIdentities == null) { -logger.debug("Client Identities is null, so assuming that Load Balancing communications are not secure. Authorizing client to participate in Load Balancing"); -return; -} - -final Set nodeIds = clusterCoordinator.getNodeIdentifiers().stream() +public void authorize(final SSLSession sslSession) throws NotAuthorizedException { +final List nodeIds = clusterCoordinator.getNodeIdentifiers().stream() .map(NodeIdentifier::getApiAddress) -.collect(Collectors.toSet()); +.collect(Collectors.toList()); -for (final String clientId : clientIdentities) { -if (nodeIds.contains(clientId)) { -logger.debug("Client ID '{}' is in the list of Nodes in the Cluster. Authorizing Client to Load Balance data", clientId); +for (final String nodeId : nodeIds) { +final HostnameVerifier verifier = new DefaultHostnameVerifier(); +if (verifier.verify(nodeId, sslSession)) { +logger.debug("Authorizing Client to Load Balance data"); --- End diff -- In a case where the cert contains exact nodeId, the `nodeId` is still informative to be logged. I'd suggest logging message something like: ```suggestion logger.debug("The request was verified with node ID '{}'. Authorizing Client to Load Balance data", nodeId); ``` ---
[jira] [Commented] (NIFI-5744) Put exception message to attribute while ExecuteSQL fail
[ https://issues.apache.org/jira/browse/NIFI-5744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664528#comment-16664528 ] ASF GitHub Bot commented on NIFI-5744: -- Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3107 Just to connect the dots here, there is some dissent from the PMC about this approach, so we should do our best to reach consensus on the mailing list before this PR is merged, in case the direction changes (switch from an attribute to additional outgoing relationships, e.g.) > Put exception message to attribute while ExecuteSQL fail > > > Key: NIFI-5744 > URL: https://issues.apache.org/jira/browse/NIFI-5744 > Project: Apache NiFi > Issue Type: Improvement >Affects Versions: 1.7.1 >Reporter: Deon Huang >Assignee: Deon Huang >Priority: Minor > > In some scenario, it would be great if we could have different behavior based > on exception. > Better error tracking afterwards in attribute format instead of tracking in > log. > For example, if it’s connection refused exception due to wrong url. > We won’t want to retry and error message attribute would be helpful to keep > track of. > While it’s other scenario that database temporary unavailable, we should > retry it based on should retry exception. > Should be a quick fix at AbstractExecuteSQL before transfer flowfile to > failure relationship > {code:java} > session.transfer(fileToProcess, REL_FAILURE); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi issue #3107: NIFI-5744: Put exception message to attribute while Execut...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3107 Just to connect the dots here, there is some dissent from the PMC about this approach, so we should do our best to reach consensus on the mailing list before this PR is merged, in case the direction changes (switch from an attribute to additional outgoing relationships, e.g.) ---
[jira] [Commented] (NIFI-5744) Put exception message to attribute while ExecuteSQL fail
[ https://issues.apache.org/jira/browse/NIFI-5744?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664512#comment-16664512 ] ASF GitHub Bot commented on NIFI-5744: -- Github user patricker commented on a diff in the pull request: https://github.com/apache/nifi/pull/3107#discussion_r228384912 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java --- @@ -83,6 +83,8 @@ @WritesAttribute(attribute = "executesql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds"), @WritesAttribute(attribute = "executesql.resultset.index", description = "Assuming multiple result sets are returned, " + "the zero based index of this result set."), +@WritesAttribute(attribute = "executesql.error.message", description = "If processing an incoming flow file causes " ++ "an Exception, the flow file is routed to failure and this attribute is set to the exception message."), --- End diff -- Can you capitalize "flow file" to "Flow File" in both processors? > Put exception message to attribute while ExecuteSQL fail > > > Key: NIFI-5744 > URL: https://issues.apache.org/jira/browse/NIFI-5744 > Project: Apache NiFi > Issue Type: Improvement >Affects Versions: 1.7.1 >Reporter: Deon Huang >Assignee: Deon Huang >Priority: Minor > > In some scenario, it would be great if we could have different behavior based > on exception. > Better error tracking afterwards in attribute format instead of tracking in > log. > For example, if it’s connection refused exception due to wrong url. > We won’t want to retry and error message attribute would be helpful to keep > track of. > While it’s other scenario that database temporary unavailable, we should > retry it based on should retry exception. > Should be a quick fix at AbstractExecuteSQL before transfer flowfile to > failure relationship > {code:java} > session.transfer(fileToProcess, REL_FAILURE); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #3107: NIFI-5744: Put exception message to attribute while...
Github user patricker commented on a diff in the pull request: https://github.com/apache/nifi/pull/3107#discussion_r228384912 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java --- @@ -83,6 +83,8 @@ @WritesAttribute(attribute = "executesql.query.fetchtime", description = "Duration of the result set fetch time in milliseconds"), @WritesAttribute(attribute = "executesql.resultset.index", description = "Assuming multiple result sets are returned, " + "the zero based index of this result set."), +@WritesAttribute(attribute = "executesql.error.message", description = "If processing an incoming flow file causes " ++ "an Exception, the flow file is routed to failure and this attribute is set to the exception message."), --- End diff -- Can you capitalize "flow file" to "Flow File" in both processors? ---
[jira] [Updated] (NIFI-5746) The SEND and RECEIVE provenance events for load balancing do not have the same transit uri syntax
[ https://issues.apache.org/jira/browse/NIFI-5746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Koji Kawamura updated NIFI-5746: Resolution: Fixed Status: Resolved (was: Patch Available) > The SEND and RECEIVE provenance events for load balancing do not have the > same transit uri syntax > - > > Key: NIFI-5746 > URL: https://issues.apache.org/jira/browse/NIFI-5746 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.8.0 >Reporter: Mark Payne >Assignee: Mark Payne >Priority: Major > Fix For: 1.9.0 > > > The SEND event has a transit uri like nifi:connection: > The RECEIVE event has a transit uri like nifi:// address>/loadbalance/ > The RECEIVE event is much preferred, as it indicates not only that the > transfer was via load balance but also includes the address of the node and > the UUID of the connection. The SEND Transit URI should be changed to > nifi:///loadbalance/ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5746) The SEND and RECEIVE provenance events for load balancing do not have the same transit uri syntax
[ https://issues.apache.org/jira/browse/NIFI-5746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664500#comment-16664500 ] ASF GitHub Bot commented on NIFI-5746: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/3109 > The SEND and RECEIVE provenance events for load balancing do not have the > same transit uri syntax > - > > Key: NIFI-5746 > URL: https://issues.apache.org/jira/browse/NIFI-5746 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.8.0 >Reporter: Mark Payne >Assignee: Mark Payne >Priority: Major > Fix For: 1.9.0 > > > The SEND event has a transit uri like nifi:connection: > The RECEIVE event has a transit uri like nifi:// address>/loadbalance/ > The RECEIVE event is much preferred, as it indicates not only that the > transfer was via load balance but also includes the address of the node and > the UUID of the connection. The SEND Transit URI should be changed to > nifi:///loadbalance/ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #3109: NIFI-5746: Use Node Identifier's node address inste...
Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/3109 ---
[jira] [Commented] (NIFI-5746) The SEND and RECEIVE provenance events for load balancing do not have the same transit uri syntax
[ https://issues.apache.org/jira/browse/NIFI-5746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664499#comment-16664499 ] ASF subversion and git services commented on NIFI-5746: --- Commit c7ff2fc5dba7c8aaeae07f4819c320e2a96555f0 in nifi's branch refs/heads/master from [~markap14] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=c7ff2fc ] NIFI-5746: Use Node Identifier's node address instead of getting from socket for RECEIVE prov events; make SEND prov events match syntax of RECEIVE prov events NIFI-5746: Addressed issue found in review process This closes #3109. Signed-off-by: Koji Kawamura > The SEND and RECEIVE provenance events for load balancing do not have the > same transit uri syntax > - > > Key: NIFI-5746 > URL: https://issues.apache.org/jira/browse/NIFI-5746 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.8.0 >Reporter: Mark Payne >Assignee: Mark Payne >Priority: Major > Fix For: 1.9.0 > > > The SEND event has a transit uri like nifi:connection: > The RECEIVE event has a transit uri like nifi:// address>/loadbalance/ > The RECEIVE event is much preferred, as it indicates not only that the > transfer was via load balance but also includes the address of the node and > the UUID of the connection. The SEND Transit URI should be changed to > nifi:///loadbalance/ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5746) The SEND and RECEIVE provenance events for load balancing do not have the same transit uri syntax
[ https://issues.apache.org/jira/browse/NIFI-5746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664498#comment-16664498 ] ASF subversion and git services commented on NIFI-5746: --- Commit c7ff2fc5dba7c8aaeae07f4819c320e2a96555f0 in nifi's branch refs/heads/master from [~markap14] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=c7ff2fc ] NIFI-5746: Use Node Identifier's node address instead of getting from socket for RECEIVE prov events; make SEND prov events match syntax of RECEIVE prov events NIFI-5746: Addressed issue found in review process This closes #3109. Signed-off-by: Koji Kawamura > The SEND and RECEIVE provenance events for load balancing do not have the > same transit uri syntax > - > > Key: NIFI-5746 > URL: https://issues.apache.org/jira/browse/NIFI-5746 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.8.0 >Reporter: Mark Payne >Assignee: Mark Payne >Priority: Major > Fix For: 1.9.0 > > > The SEND event has a transit uri like nifi:connection: > The RECEIVE event has a transit uri like nifi:// address>/loadbalance/ > The RECEIVE event is much preferred, as it indicates not only that the > transfer was via load balance but also includes the address of the node and > the UUID of the connection. The SEND Transit URI should be changed to > nifi:///loadbalance/ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5746) The SEND and RECEIVE provenance events for load balancing do not have the same transit uri syntax
[ https://issues.apache.org/jira/browse/NIFI-5746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664496#comment-16664496 ] ASF GitHub Bot commented on NIFI-5746: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/3109 @markap14 The update looks good tome, +1. Thanks @markap14 I'm going to merge it. > The SEND and RECEIVE provenance events for load balancing do not have the > same transit uri syntax > - > > Key: NIFI-5746 > URL: https://issues.apache.org/jira/browse/NIFI-5746 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.8.0 >Reporter: Mark Payne >Assignee: Mark Payne >Priority: Major > Fix For: 1.9.0 > > > The SEND event has a transit uri like nifi:connection: > The RECEIVE event has a transit uri like nifi:// address>/loadbalance/ > The RECEIVE event is much preferred, as it indicates not only that the > transfer was via load balance but also includes the address of the node and > the UUID of the connection. The SEND Transit URI should be changed to > nifi:///loadbalance/ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi issue #3109: NIFI-5746: Use Node Identifier's node address instead of g...
Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/3109 @markap14 The update looks good tome, +1. Thanks @markap14 I'm going to merge it. ---
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664441#comment-16664441 ] ASF GitHub Bot commented on NIFI-5642: -- Github user aglotero commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228371811 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet =
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user aglotero commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228371811 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet = queryFuture.getUninterruptibly(); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, 0, null)); -
[jira] [Assigned] (NIFI-5748) Improve handling of X-Forwarded-* headers in URI Rewriting
[ https://issues.apache.org/jira/browse/NIFI-5748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Storck reassigned NIFI-5748: - Assignee: Jeff Storck > Improve handling of X-Forwarded-* headers in URI Rewriting > -- > > Key: NIFI-5748 > URL: https://issues.apache.org/jira/browse/NIFI-5748 > Project: Apache NiFi > Issue Type: Improvement >Reporter: Kevin Doran >Assignee: Jeff Storck >Priority: Major > > This ticket is to improve the handling of headers used by popular proxies > when rewriting URIs in NiFI. Currently, NiFi checks the following headers > when determining how to re-write URLs it returns clients: > From > [ApplicationResource|https://github.com/apache/nifi/blob/2201f7746fd16874aefbd12d546565f5d105ab04/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java#L110]: > {code:java} > public static final String PROXY_SCHEME_HTTP_HEADER = "X-ProxyScheme"; > public static final String PROXY_HOST_HTTP_HEADER = "X-ProxyHost"; > public static final String PROXY_PORT_HTTP_HEADER = "X-ProxyPort"; > public static final String PROXY_CONTEXT_PATH_HTTP_HEADER = > "X-ProxyContextPath"; > public static final String FORWARDED_PROTO_HTTP_HEADER = "X-Forwarded-Proto"; > public static final String FORWARDED_HOST_HTTP_HEADER = "X-Forwarded-Server"; > public static final String FORWARDED_PORT_HTTP_HEADER = "X-Forwarded-Port"; > public static final String FORWARDED_CONTEXT_HTTP_HEADER = > "X-Forwarded-Context"; > // ... > final String scheme = getFirstHeaderValue(PROXY_SCHEME_HTTP_HEADER, > FORWARDED_PROTO_HTTP_HEADER); > final String host = getFirstHeaderValue(PROXY_HOST_HTTP_HEADER, > FORWARDED_HOST_HTTP_HEADER); > final String port = getFirstHeaderValue(PROXY_PORT_HTTP_HEADER, > FORWARDED_PORT_HTTP_HEADER); > {code} > Based on this, it looks like if both {{X-Forwarded-Server}} and > {{X-Forwarded-Host}} are set, that {{-Host}} will contain the hostname the > user/client requested, and {{-Server}} will contain the hostname of the proxy > server (ie, what the proxy server is able to determine from inspecting the > hostname of the instance it is on). See this for more details: > https://stackoverflow.com/questions/43689625/x-forwarded-host-vs-x-forwarded-server > Here is a demo based on docker containers and a reverse-proxy called Traefik > that shows where the current NiFi logic can break: > https://gist.github.com/kevdoran/2892004ccbfbb856115c8a756d9d4538 > To use this gist, you can run the following: > {noformat} > wget -qO- > https://gist.githubusercontent.com/kevdoran/2892004ccbfbb856115c8a756d9d4538/raw/fb72151900d4d8fdcf4919fe5c8a94805fbb8401/docker-compose.yml > | docker-compose -f - up > {noformat} > Once the environment is up. Go to http://nifi.docker.localhost/nifi in Chrome > and try adding/configuring/moving a processor. This will reproduce the issue. > For this task, the following improvement is recommended: > Change the Header (string literal) for FORWARDED_HOST_HTTP_HEADER from > "X-Forwarded-Server" to "X-Forwarded-Host" > Additionally, some proxies use a different header for the context path > prefix. Traefik uses {{X-Forwarded-Prefix}}. There does not appear to be a > universal standard. In the future, we could make this header configurable, > but for now, perhaps we should add {{X-Forwarded-Prefix}} to the headers > checked by NiFi so that Traefik is supported out-of-the-box. > *Important:* After making this change, verify that proxying NiFi via Knox > still works, i.e., Knox should be sending the X-Forwarded-Host header that > matches what the user requested in the browser. > This change applies to NiFi Registry as well. > /cc [~jtstorck] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5625) Support variables for the properties of HTTP processors
[ https://issues.apache.org/jira/browse/NIFI-5625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664280#comment-16664280 ] ASF GitHub Bot commented on NIFI-5625: -- Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3020 Mind rebasing this against the latest master? Please and thanks! > Support variables for the properties of HTTP processors > > > Key: NIFI-5625 > URL: https://issues.apache.org/jira/browse/NIFI-5625 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions, Variable Registry >Affects Versions: 1.7.1 >Reporter: Kemix Koo >Priority: Minor > > When set some group (global) variables, some properties of HTTP processors > don't support the expressions. for example, USER and PASS, if don't set the > global variables, must change for each processors one by one. it's so > troublesomely. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi issue #3020: NIFI-5625: support the variables for the properties of HTT...
Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/3020 Mind rebasing this against the latest master? Please and thanks! ---
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664273#comment-16664273 ] ASF GitHub Bot commented on NIFI-5642: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228325897 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -33,7 +32,7 @@ import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; -import org.apache.commons.text.StringEscapeUtils; +import org.apache.commons.lang3.StringEscapeUtils; --- End diff -- I think we're on the verge of using Apache Commons Text instead of Commons Lang 3, maybe consider keeping this the way it is? > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664272#comment-16664272 ] ASF GitHub Bot commented on NIFI-5642: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228325643 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java --- @@ -132,17 +132,25 @@ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); +// Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") -.description("A FlowFile is transferred to this relationship if the operation completed successfully.") +.description("Successfully created FlowFile from CQL query result set.") --- End diff -- This relationship is reused by PutCassandraQL as well, where there is no result set or query per se (it's a statement). That's why the doc is so generic. If you'd like to have the doc be more specific, you can create a REL_SUCCESS relationship in QueryCassandra using `new Relationship.Builder().from(AbstractCassandraProcessor.REL_SUCCESS).description("Your description override").build()` > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664275#comment-16664275 ] ASF GitHub Bot commented on NIFI-5642: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228328178 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet =
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664274#comment-16664274 ] ASF GitHub Bot commented on NIFI-5642: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228327507 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet =
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664276#comment-16664276 ] ASF GitHub Bot commented on NIFI-5642: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228331847 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet =
[jira] [Commented] (NIFI-5642) QueryCassandra processor : output FlowFiles as soon fetch_size is reached
[ https://issues.apache.org/jira/browse/NIFI-5642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664277#comment-16664277 ] ASF GitHub Bot commented on NIFI-5642: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228332165 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -400,77 +478,87 @@ public static long convertToJsonStream(final ResultSet rs, final OutputStream ou outStream.write("{\"results\":[".getBytes(charset)); final ColumnDefinitions columnDefinitions = rs.getColumnDefinitions(); long nrOfRows = 0; +long rowsAvailableWithoutFetching = rs.getAvailableWithoutFetching(); + if (columnDefinitions != null) { -do { - -// Grab the ones we have -int rowsAvailableWithoutFetching = rs.getAvailableWithoutFetching(); -if (rowsAvailableWithoutFetching == 0) { -// Get more -if (timeout <= 0 || timeUnit == null) { -rs.fetchMoreResults().get(); -} else { -rs.fetchMoreResults().get(timeout, timeUnit); -} + +// Grab the ones we have +if (rowsAvailableWithoutFetching == 0) { +// Get more +if (timeout <= 0 || timeUnit == null) { +rs.fetchMoreResults().get(); +} else { +rs.fetchMoreResults().get(timeout, timeUnit); } +rowsAvailableWithoutFetching = rs.getAvailableWithoutFetching(); +} -for (Row row : rs) { -if (nrOfRows != 0) { +if(maxRowsPerFlowFile == 0){ +maxRowsPerFlowFile = rowsAvailableWithoutFetching; +} +Row row; +while(nrOfRows < maxRowsPerFlowFile){ +try { +row = rs.iterator().next(); +}catch (NoSuchElementException nsee){ +//nrOfRows -= 1; --- End diff -- This is commented out here but active in the Avro version above, I assume they need to be the same? > QueryCassandra processor : output FlowFiles as soon fetch_size is reached > - > > Key: NIFI-5642 > URL: https://issues.apache.org/jira/browse/NIFI-5642 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 >Reporter: André Gomes Lamas Otero >Priority: Major > > When I'm using QueryCassandra alongside with fetch_size parameter I expected > that as soon my reader reaches the fetch_size the processor outputs some data > to be processed by the next processor, but QueryCassandra reads all the data, > then output the flow files. > I'll start to work on a patch for this situation, I'll appreciate any > suggestion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228325897 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -33,7 +32,7 @@ import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; -import org.apache.commons.text.StringEscapeUtils; +import org.apache.commons.lang3.StringEscapeUtils; --- End diff -- I think we're on the verge of using Apache Commons Text instead of Commons Lang 3, maybe consider keeping this the way it is? ---
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228325643 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java --- @@ -132,17 +132,25 @@ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) .build(); +// Relationships public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") -.description("A FlowFile is transferred to this relationship if the operation completed successfully.") +.description("Successfully created FlowFile from CQL query result set.") --- End diff -- This relationship is reused by PutCassandraQL as well, where there is no result set or query per se (it's a statement). That's why the doc is so generic. If you'd like to have the doc be more specific, you can create a REL_SUCCESS relationship in QueryCassandra using `new Relationship.Builder().from(AbstractCassandraProcessor.REL_SUCCESS).description("Your description override").build()` ---
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228332165 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -400,77 +478,87 @@ public static long convertToJsonStream(final ResultSet rs, final OutputStream ou outStream.write("{\"results\":[".getBytes(charset)); final ColumnDefinitions columnDefinitions = rs.getColumnDefinitions(); long nrOfRows = 0; +long rowsAvailableWithoutFetching = rs.getAvailableWithoutFetching(); + if (columnDefinitions != null) { -do { - -// Grab the ones we have -int rowsAvailableWithoutFetching = rs.getAvailableWithoutFetching(); -if (rowsAvailableWithoutFetching == 0) { -// Get more -if (timeout <= 0 || timeUnit == null) { -rs.fetchMoreResults().get(); -} else { -rs.fetchMoreResults().get(timeout, timeUnit); -} + +// Grab the ones we have +if (rowsAvailableWithoutFetching == 0) { +// Get more +if (timeout <= 0 || timeUnit == null) { +rs.fetchMoreResults().get(); +} else { +rs.fetchMoreResults().get(timeout, timeUnit); } +rowsAvailableWithoutFetching = rs.getAvailableWithoutFetching(); +} -for (Row row : rs) { -if (nrOfRows != 0) { +if(maxRowsPerFlowFile == 0){ +maxRowsPerFlowFile = rowsAvailableWithoutFetching; +} +Row row; +while(nrOfRows < maxRowsPerFlowFile){ +try { +row = rs.iterator().next(); +}catch (NoSuchElementException nsee){ +//nrOfRows -= 1; --- End diff -- This is commented out here but active in the Avro version above, I assume they need to be the same? ---
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228331847 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet = queryFuture.getUninterruptibly(); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, 0, null)); -
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228328178 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet = queryFuture.getUninterruptibly(); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, 0, null)); -
[GitHub] nifi pull request #3051: NIFI-5642: QueryCassandra processor : output FlowFi...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3051#discussion_r228327507 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java --- @@ -191,76 +203,110 @@ public void onScheduled(final ProcessContext context) { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { +FlowFile inputFlowFile = null; FlowFile fileToProcess = null; + +Map attributes = null; + if (context.hasIncomingConnection()) { -fileToProcess = session.get(); +inputFlowFile = session.get(); // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. // However, if we have no FlowFile and we have connections coming from other Processors, then // we know that we should run only if we have a FlowFile. -if (fileToProcess == null && context.hasNonLoopConnection()) { +if (inputFlowFile == null && context.hasNonLoopConnection()) { return; } + +attributes = inputFlowFile.getAttributes(); } final ComponentLog logger = getLogger(); -final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); -final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(fileToProcess).asTimePeriod(TimeUnit.MILLISECONDS); +final String selectQuery = context.getProperty(CQL_SELECT_QUERY).evaluateAttributeExpressions(inputFlowFile).getValue(); +final long queryTimeout = context.getProperty(QUERY_TIMEOUT).evaluateAttributeExpressions(inputFlowFile).asTimePeriod(TimeUnit.MILLISECONDS); final String outputFormat = context.getProperty(OUTPUT_FORMAT).getValue(); -final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(fileToProcess).getValue()); +final long maxRowsPerFlowFile = context.getProperty(MAX_ROWS_PER_FLOW_FILE).evaluateAttributeExpressions().asInteger(); +final Charset charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(inputFlowFile).getValue()); final StopWatch stopWatch = new StopWatch(true); -if (fileToProcess == null) { -fileToProcess = session.create(); +if(inputFlowFile != null){ +session.transfer(inputFlowFile, REL_ORIGINAL); } try { // The documentation for the driver recommends the session remain open the entire time the processor is running // and states that it is thread-safe. This is why connectionSession is not in a try-with-resources. final Session connectionSession = cassandraSession.get(); -final ResultSetFuture queryFuture = connectionSession.executeAsync(selectQuery); +final ResultSet resultSet; + +if (queryTimeout > 0) { +resultSet = connectionSession.execute(selectQuery, queryTimeout, TimeUnit.MILLISECONDS); +}else{ +resultSet = connectionSession.execute(selectQuery); +} + final AtomicLong nrOfRows = new AtomicLong(0L); -fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { -@Override -public void process(final OutputStream out) throws IOException { -try { -logger.debug("Executing CQL query {}", new Object[]{selectQuery}); -final ResultSet resultSet; -if (queryTimeout > 0) { -resultSet = queryFuture.getUninterruptibly(queryTimeout, TimeUnit.MILLISECONDS); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, queryTimeout, TimeUnit.MILLISECONDS)); -} else if (JSON_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToJsonStream(resultSet, out, charset, queryTimeout, TimeUnit.MILLISECONDS)); -} -} else { -resultSet = queryFuture.getUninterruptibly(); -if (AVRO_FORMAT.equals(outputFormat)) { - nrOfRows.set(convertToAvroStream(resultSet, out, 0, null)); -
[jira] [Commented] (MINIFICPP-653) Log message will segfault client if no content produced
[ https://issues.apache.org/jira/browse/MINIFICPP-653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664199#comment-16664199 ] ASF GitHub Bot commented on MINIFICPP-653: -- Github user phrocker commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/427 One of the docker tests fails -- currently debugging. > Log message will segfault client if no content produced > --- > > Key: MINIFICPP-653 > URL: https://issues.apache.org/jira/browse/MINIFICPP-653 > Project: NiFi MiNiFi C++ > Issue Type: Improvement >Reporter: Mr TheSegfault >Assignee: Mr TheSegfault >Priority: Blocker > > Log message will segfault client if no content produced -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi-minifi-cpp issue #427: MINIFICPP-653: Check if empty content, if so don...
Github user phrocker commented on the issue: https://github.com/apache/nifi-minifi-cpp/pull/427 One of the docker tests fails -- currently debugging. ---
[jira] [Commented] (NIFI-5754) bootstrap.conf not found in NiFiPropertiesLoader with custom conf.dir value
[ https://issues.apache.org/jira/browse/NIFI-5754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664144#comment-16664144 ] Jody DesRoches commented on NIFI-5754: -- Sorry I don't have an environment that I can test and build a pull request at this time. It appears calling static method org.apache.nifi.bootstrap.RunNiFi.getDefaultBootstrapConfFile() in org.apache.nifi.properties.NiFiPropertiesLoader would be a good solution to ensure consistency. > bootstrap.conf not found in NiFiPropertiesLoader with custom conf.dir value > --- > > Key: NIFI-5754 > URL: https://issues.apache.org/jira/browse/NIFI-5754 > Project: Apache NiFi > Issue Type: Bug > Components: Configuration Management >Affects Versions: 1.7.1 > Environment: N/A >Reporter: Jody DesRoches >Priority: Major > > Issue in NiFiPropertiesLoader when trying to find > "nifi.bootstrap.sensitive.key=" > When starting the nifi service, $NIFI_HOME/conf/bootstrap.conf is used. When > NiFiPropertiesLoader attempts to decrypt nifi.properties it guesses that > bootstrap.conf exists in the same directory as nifi.properties. This is not > true if "conf.dir" has been defined in bootstrap.conf. > Apparent problem is on NiFiPropertiesLoader line 113. Method > extractKeyFromBootstrapFile() should call > extractKeyFromBootstrapFile("$NIFI_HOME/conf/bootstrap.conf") by default as > the method comment indicates. Instead it calls this method with an empty > string resulting in an IOException on service startup if the conf.dir has > been customized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5754) bootstrap.conf not found in NiFiPropertiesLoader with custom conf.dir value
[ https://issues.apache.org/jira/browse/NIFI-5754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664089#comment-16664089 ] Jody DesRoches commented on NIFI-5754: -- Work around is to keep a copy of bootstrap.conf in $NIFI_HOME/conf and conf.dir/ but I haven't tested how that would be affected by running encrypt-config.sh. Where will the hexkey actually be stored? > bootstrap.conf not found in NiFiPropertiesLoader with custom conf.dir value > --- > > Key: NIFI-5754 > URL: https://issues.apache.org/jira/browse/NIFI-5754 > Project: Apache NiFi > Issue Type: Bug > Components: Configuration Management >Affects Versions: 1.7.1 > Environment: N/A >Reporter: Jody DesRoches >Priority: Major > > Issue in NiFiPropertiesLoader when trying to find > "nifi.bootstrap.sensitive.key=" > When starting the nifi service, $NIFI_HOME/conf/bootstrap.conf is used. When > NiFiPropertiesLoader attempts to decrypt nifi.properties it guesses that > bootstrap.conf exists in the same directory as nifi.properties. This is not > true if "conf.dir" has been defined in bootstrap.conf. > Apparent problem is on NiFiPropertiesLoader line 113. Method > extractKeyFromBootstrapFile() should call > extractKeyFromBootstrapFile("$NIFI_HOME/conf/bootstrap.conf") by default as > the method comment indicates. Instead it calls this method with an empty > string resulting in an IOException on service startup if the conf.dir has > been customized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (NIFI-5754) bootstrap.conf not found in NiFiPropertiesLoader with custom conf.dir value
Jody DesRoches created NIFI-5754: Summary: bootstrap.conf not found in NiFiPropertiesLoader with custom conf.dir value Key: NIFI-5754 URL: https://issues.apache.org/jira/browse/NIFI-5754 Project: Apache NiFi Issue Type: Bug Components: Configuration Management Affects Versions: 1.7.1 Environment: N/A Reporter: Jody DesRoches Issue in NiFiPropertiesLoader when trying to find "nifi.bootstrap.sensitive.key=" When starting the nifi service, $NIFI_HOME/conf/bootstrap.conf is used. When NiFiPropertiesLoader attempts to decrypt nifi.properties it guesses that bootstrap.conf exists in the same directory as nifi.properties. This is not true if "conf.dir" has been defined in bootstrap.conf. Apparent problem is on NiFiPropertiesLoader line 113. Method extractKeyFromBootstrapFile() should call extractKeyFromBootstrapFile("$NIFI_HOME/conf/bootstrap.conf") by default as the method comment indicates. Instead it calls this method with an empty string resulting in an IOException on service startup if the conf.dir has been customized. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5149) GetSplunk Processor should have input flow, which is currently missing
[ https://issues.apache.org/jira/browse/NIFI-5149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663974#comment-16663974 ] Mohammad commented on NIFI-5149: Thank you for the feedback, [~aclowkey]! I am having a similar challenge, so I'd like to elaborate the use case that [~bende] brought up: The processor has a query parameter, which is good if you have a 'permanent' query that you run regularly ['show me where disk=full'.] However, if I'd like to (for example) generate a list of queries, and dynamically execute them in Splunk _(which appears to be a popular use case)_, the processor does not support taking the output from a previous processor ['disk=75%', 'disk=85%', 'disk=95%', ...], and setting it as a variable that I could use in the Query parameter when a queue is ready. > GetSplunk Processor should have input flow, which is currently missing > -- > > Key: NIFI-5149 > URL: https://issues.apache.org/jira/browse/NIFI-5149 > Project: Apache NiFi > Issue Type: New Feature > Components: Core Framework >Affects Versions: 1.6.0 >Reporter: Brajendra >Priority: Critical > Labels: getSplunk > > Hi Team, > We have found there is only 'GetSplunk' processor to connect and query to > Splunk in Apache NiFi. > Hence, we this processor does not take any type or input. > > Do we have another type to Apache NiFi processor which can take parameters > as input (details of Splunk indexes, query, instance etc.) from other > processor? > If not then please suggest when such type of processor can be expected in > upcoming release? > > Environment: NiFi 1.5.0 and 1.6.0 > Brajendra Mishra -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (NIFI-5751) Use the GetFile Processor can't get a complete file
[ https://issues.apache.org/jira/browse/NIFI-5751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joseph Witt resolved NIFI-5751. --- Resolution: Information Provided Assignee: Joseph Witt > Use the GetFile Processor can't get a complete file > --- > > Key: NIFI-5751 > URL: https://issues.apache.org/jira/browse/NIFI-5751 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 > Environment: centos7 >Reporter: weijia.hao >Assignee: Joseph Witt >Priority: Major > Attachments: filesize.png, input.png, output.png, processer.png, > putfile.png > > > I use getFile and putFile processor to realize mv function. All > configurations are default. when I mv a file to the GetFile processor's Input > directory.This file will be completely moved to the putFile processor's > output Directory. But when I cp a file to processor's Input Directory > slowly,this file will be cut into pieces . for example,I put a 1.2G file into > Input directory, When the process is over,there are only 2M of files in the > output directory.How do I move a complete file? > !input.png!!output.png!!processer.png! > > !putfile.png!!filesize.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5751) Use the GetFile Processor can't get a complete file
[ https://issues.apache.org/jira/browse/NIFI-5751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663910#comment-16663910 ] Joseph Witt commented on NIFI-5751: --- Hello. This appears to be a classic race condition configuration. If the process writing the file and the processing reading the file (nifi) have no way of sharing information on readiness then this will happen as you describe it. You must choose one of the following styles: - filename based waiting (usually a '.' prepending to the filename marks it as 'hidden' by intent but you could us other naming). Once writing completes change to the name without the . or whatever pattern. GetFile supports this with ignore hidden files and pattern based pulling. - waiting for mod time to be a certain age. GetFile supports this with min file age. > Use the GetFile Processor can't get a complete file > --- > > Key: NIFI-5751 > URL: https://issues.apache.org/jira/browse/NIFI-5751 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 > Environment: centos7 >Reporter: weijia.hao >Priority: Major > Attachments: filesize.png, input.png, output.png, processer.png, > putfile.png > > > I use getFile and putFile processor to realize mv function. All > configurations are default. when I mv a file to the GetFile processor's Input > directory.This file will be completely moved to the putFile processor's > output Directory. But when I cp a file to processor's Input Directory > slowly,this file will be cut into pieces . for example,I put a 1.2G file into > Input directory, When the process is over,there are only 2M of files in the > output directory.How do I move a complete file? > !input.png!!output.png!!processer.png! > > !putfile.png!!filesize.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5621) Create a Connection Pooling service implementation to be used in Cassandra processors
[ https://issues.apache.org/jira/browse/NIFI-5621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663902#comment-16663902 ] ASF GitHub Bot commented on NIFI-5621: -- Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/3105 Easiest way to do the L for your NAR here is this: `mvn dependency:tree | grep -i compile` Provided and test dependencies aren't part of the NAR, so only focus on compile. > Create a Connection Pooling service implementation to be used in Cassandra > processors > - > > Key: NIFI-5621 > URL: https://issues.apache.org/jira/browse/NIFI-5621 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Sivaprasanna Sethuraman >Assignee: Sivaprasanna Sethuraman >Priority: Major > > Like how the Relational Database processors leverage 'DBCPConnectionPool' > controller service, there should be one that could be used by the processors > from Cassandra bundle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi issue #3105: NIFI-5621: Added Cassandra connection provider service
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/3105 Easiest way to do the L for your NAR here is this: `mvn dependency:tree | grep -i compile` Provided and test dependencies aren't part of the NAR, so only focus on compile. ---
[jira] [Commented] (NIFI-5746) The SEND and RECEIVE provenance events for load balancing do not have the same transit uri syntax
[ https://issues.apache.org/jira/browse/NIFI-5746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663839#comment-16663839 ] ASF GitHub Bot commented on NIFI-5746: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3109#discussion_r228206921 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java --- @@ -130,17 +130,14 @@ public void receiveFlowFiles(final Socket socket) throws IOException { final Set certIdentities; try { certIdentities = getCertificateIdentities(sslSession); - -final String dn = CertificateUtils.extractPeerDNFromSSLSocket(socket); -peerDescription = CertificateUtils.extractUsername(dn); } catch (final CertificateException e) { throw new IOException("Failed to extract Client Certificate", e); } logger.debug("Connection received from peer {}. Will perform authorization against Client Identities '{}'", peerDescription, certIdentities); -authorizer.authorize(certIdentities); +peerDescription = authorizer.authorize(certIdentities); --- End diff -- Wow, good catch, thanks! I wondered "How did I miss that?" but it turns out that the system I tested it on was different than the system that I first noticed the problem on. On the system I used to test, they ended up being the same value :) Will push a new commit. At this point, we don't even need 'nodeName', just peerDescription. > The SEND and RECEIVE provenance events for load balancing do not have the > same transit uri syntax > - > > Key: NIFI-5746 > URL: https://issues.apache.org/jira/browse/NIFI-5746 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.8.0 >Reporter: Mark Payne >Assignee: Mark Payne >Priority: Major > Fix For: 1.9.0 > > > The SEND event has a transit uri like nifi:connection: > The RECEIVE event has a transit uri like nifi:// address>/loadbalance/ > The RECEIVE event is much preferred, as it indicates not only that the > transfer was via load balance but also includes the address of the node and > the UUID of the connection. The SEND Transit URI should be changed to > nifi:///loadbalance/ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #3109: NIFI-5746: Use Node Identifier's node address inste...
Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/3109#discussion_r228206921 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java --- @@ -130,17 +130,14 @@ public void receiveFlowFiles(final Socket socket) throws IOException { final Set certIdentities; try { certIdentities = getCertificateIdentities(sslSession); - -final String dn = CertificateUtils.extractPeerDNFromSSLSocket(socket); -peerDescription = CertificateUtils.extractUsername(dn); } catch (final CertificateException e) { throw new IOException("Failed to extract Client Certificate", e); } logger.debug("Connection received from peer {}. Will perform authorization against Client Identities '{}'", peerDescription, certIdentities); -authorizer.authorize(certIdentities); +peerDescription = authorizer.authorize(certIdentities); --- End diff -- Wow, good catch, thanks! I wondered "How did I miss that?" but it turns out that the system I tested it on was different than the system that I first noticed the problem on. On the system I used to test, they ended up being the same value :) Will push a new commit. At this point, we don't even need 'nodeName', just peerDescription. ---
[jira] [Created] (NIFI-5753) Add SSL support to HortonworksSchemaRegistry service
Grzegorz Kołakowski created NIFI-5753: - Summary: Add SSL support to HortonworksSchemaRegistry service Key: NIFI-5753 URL: https://issues.apache.org/jira/browse/NIFI-5753 Project: Apache NiFi Issue Type: Improvement Components: Extensions Reporter: Grzegorz Kołakowski Currently HortonworksSchemaRegistry service does not support communication over HTTPS. We should be able to add the ssl context to the processor and pass it to the underlying schema registry client. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (MINIFICPP-654) C API: failure callback improvements
[ https://issues.apache.org/jira/browse/MINIFICPP-654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663796#comment-16663796 ] Mr TheSegfault commented on MINIFICPP-654: -- There are also several comments on the related ticket. I'll attempt to find those and copy them into here to discuss. > C API: failure callback improvements > > > Key: MINIFICPP-654 > URL: https://issues.apache.org/jira/browse/MINIFICPP-654 > Project: NiFi MiNiFi C++ > Issue Type: Improvement >Reporter: Arpad Boda >Assignee: Arpad Boda >Priority: Minor > Fix For: 0.6.0 > > > Improvements and further discussion of failure callbacks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (MINIFICPP-654) C API: failure callback improvements
[ https://issues.apache.org/jira/browse/MINIFICPP-654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663795#comment-16663795 ] Mr TheSegfault commented on MINIFICPP-654: -- Failure handling should be aware of rolling back – meaning that data shouldn't be copied via a callback. We should natively provide support rolling back data from the processor that failed ( if it failed ). This gives users the ability to get data that is not partially processed, so another route could be taken or something else could be done with that data. > C API: failure callback improvements > > > Key: MINIFICPP-654 > URL: https://issues.apache.org/jira/browse/MINIFICPP-654 > Project: NiFi MiNiFi C++ > Issue Type: Improvement >Reporter: Arpad Boda >Assignee: Arpad Boda >Priority: Minor > Fix For: 0.6.0 > > > Improvements and further discussion of failure callbacks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (MINIFICPP-656) Discuss Failure handling in CAPI
[ https://issues.apache.org/jira/browse/MINIFICPP-656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mr TheSegfault resolved MINIFICPP-656. -- Resolution: Fixed OBE in favor of MINIFICPP-654 > Discuss Failure handling in CAPI > > > Key: MINIFICPP-656 > URL: https://issues.apache.org/jira/browse/MINIFICPP-656 > Project: NiFi MiNiFi C++ > Issue Type: Improvement >Reporter: Mr TheSegfault >Priority: Major > > [~aboda] we could use this as a discussion thread. I'll begin adding some > comments. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (MINIFICPP-656) Discuss Failure handling in CAPI
[ https://issues.apache.org/jira/browse/MINIFICPP-656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mr TheSegfault closed MINIFICPP-656. Assignee: Mr TheSegfault > Discuss Failure handling in CAPI > > > Key: MINIFICPP-656 > URL: https://issues.apache.org/jira/browse/MINIFICPP-656 > Project: NiFi MiNiFi C++ > Issue Type: Improvement >Reporter: Mr TheSegfault >Assignee: Mr TheSegfault >Priority: Major > > [~aboda] we could use this as a discussion thread. I'll begin adding some > comments. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (MINIFICPP-656) Discuss Failure handling in CAPI
Mr TheSegfault created MINIFICPP-656: Summary: Discuss Failure handling in CAPI Key: MINIFICPP-656 URL: https://issues.apache.org/jira/browse/MINIFICPP-656 Project: NiFi MiNiFi C++ Issue Type: Improvement Reporter: Mr TheSegfault [~aboda] we could use this as a discussion thread. I'll begin adding some comments. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5752) Load balancing fails with wildcard certs
[ https://issues.apache.org/jira/browse/NIFI-5752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663574#comment-16663574 ] ASF GitHub Bot commented on NIFI-5752: -- GitHub user kotarot opened a pull request: https://github.com/apache/nifi/pull/3110 NIFI-5752: Load balancing fails with wildcard certs Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [X] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [X] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [X] Has your PR been rebased against the latest commit within the target branch (typically master)? - [X] Is your initial contribution a single, squashed commit? ### For code changes: - [X] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kotarot/nifi NIFI-5752 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3110.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3110 commit 256edff0c4515c94093030efc8ad20e45819b963 Author: Kotaro Terada Date: 2018-10-25T10:46:06Z NIFI-5752: Load balancing fails with wildcard certs > Load balancing fails with wildcard certs > > > Key: NIFI-5752 > URL: https://issues.apache.org/jira/browse/NIFI-5752 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.8.0 >Reporter: Kotaro Terada >Priority: Major > > Load balancing fails when we construct a secure cluster with wildcard certs. > For example, assume that we have a valid wildcard cert for {{*.example.com}} > and a cluster consists of {{nf1.example.com}}, {{nf2.example.com}}, and > {{nf3.example.com}} . We cannot transfer a FlowFile between nodes for load > balancing because of the following authorization error: > {noformat} > 2018-10-25 19:05:13,520 WARN [Load Balance Server Thread-2] > o.a.n.c.q.c.s.ClusterLoadBalanceAuthorizer Authorization failed for Client > ID's [*.example.com] to Load Balance data because none of the ID's are known > Cluster Node Identifiers > 2018-10-25 19:05:13,521 ERROR [Load Balance Server Thread-2] > o.a.n.c.q.c.s.ConnectionLoadBalanceServer Failed to communicate with Peer > /xxx.xxx.xxx.xxx:x > org.apache.nifi.controller.queue.clustered.server.NotAuthorizedException: > Client ID's [*.example.com] are not authorized to Load Balance data > at > org.apache.nifi.controller.queue.clustered.server.ClusterLoadBalanceAuthorizer.authorize(ClusterLoadBalanceAuthorizer.java:65) > at > org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol.receiveFlowFiles(StandardLoadBalanceProtocol.java:142) > at > org.apache.nifi.controller.queue.clustered.server.ConnectionLoadBalanceServer$CommunicateAction.run(ConnectionLoadBalanceServer.java:176) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at >
[GitHub] nifi pull request #3110: NIFI-5752: Load balancing fails with wildcard certs
GitHub user kotarot opened a pull request: https://github.com/apache/nifi/pull/3110 NIFI-5752: Load balancing fails with wildcard certs Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [X] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [X] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [X] Has your PR been rebased against the latest commit within the target branch (typically master)? - [X] Is your initial contribution a single, squashed commit? ### For code changes: - [X] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kotarot/nifi NIFI-5752 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3110.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3110 commit 256edff0c4515c94093030efc8ad20e45819b963 Author: Kotaro Terada Date: 2018-10-25T10:46:06Z NIFI-5752: Load balancing fails with wildcard certs ---
[jira] [Created] (NIFI-5752) Load balancing fails with wildcard certs
Kotaro Terada created NIFI-5752: --- Summary: Load balancing fails with wildcard certs Key: NIFI-5752 URL: https://issues.apache.org/jira/browse/NIFI-5752 Project: Apache NiFi Issue Type: Bug Affects Versions: 1.8.0 Reporter: Kotaro Terada Load balancing fails when we construct a secure cluster with wildcard certs. For example, assume that we have a valid wildcard cert for {{*.example.com}} and a cluster consists of {{nf1.example.com}}, {{nf2.example.com}}, and {{nf3.example.com}} . We cannot transfer a FlowFile between nodes for load balancing because of the following authorization error: {noformat} 2018-10-25 19:05:13,520 WARN [Load Balance Server Thread-2] o.a.n.c.q.c.s.ClusterLoadBalanceAuthorizer Authorization failed for Client ID's [*.example.com] to Load Balance data because none of the ID's are known Cluster Node Identifiers 2018-10-25 19:05:13,521 ERROR [Load Balance Server Thread-2] o.a.n.c.q.c.s.ConnectionLoadBalanceServer Failed to communicate with Peer /xxx.xxx.xxx.xxx:x org.apache.nifi.controller.queue.clustered.server.NotAuthorizedException: Client ID's [*.example.com] are not authorized to Load Balance data at org.apache.nifi.controller.queue.clustered.server.ClusterLoadBalanceAuthorizer.authorize(ClusterLoadBalanceAuthorizer.java:65) at org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol.receiveFlowFiles(StandardLoadBalanceProtocol.java:142) at org.apache.nifi.controller.queue.clustered.server.ConnectionLoadBalanceServer$CommunicateAction.run(ConnectionLoadBalanceServer.java:176) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {noformat} This problem occurs because in {{authorize}} method in {{ClusterLoadBalanceAuthorizer}} class, authorization is tried by just matching strings. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (NIFI-5751) Use the GetFile Processor can't get a complete file
[ https://issues.apache.org/jira/browse/NIFI-5751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] weijia.hao updated NIFI-5751: - Description: I use getFile and putFile processor to realize mv function. All configurations are default. when I mv a file to the GetFile processor's Input directory.This file will be completely moved to the putFile processor's output Directory. But when I cp a file to processor's Input Directory slowly,this file will be cut into pieces . for example,I put a 1.2G file into Input directory, When the process is over,there are only 2M of files in the output directory.How do I move a complete file? !input.png!!output.png!!processer.png! !putfile.png!!filesize.png! was: I use getFile and putFile processor to realize mv function. All configurations are default. when I mv a file to the GetFile processor's Input directory.This file will be completely moved to the putFile processor's output Directory. But when I cop a file to processor's Input Directory slowly,this file will be cut into pieces . for example,I put a 1.2G file into Input directory, When the process is over,there are only 2M of files in the output directory.How do I move a complete file? !input.png!!output.png!!processer.png! !putfile.png!!filesize.png! > Use the GetFile Processor can't get a complete file > --- > > Key: NIFI-5751 > URL: https://issues.apache.org/jira/browse/NIFI-5751 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 > Environment: centos7 >Reporter: weijia.hao >Priority: Major > Attachments: filesize.png, input.png, output.png, processer.png, > putfile.png > > > I use getFile and putFile processor to realize mv function. All > configurations are default. when I mv a file to the GetFile processor's Input > directory.This file will be completely moved to the putFile processor's > output Directory. But when I cp a file to processor's Input Directory > slowly,this file will be cut into pieces . for example,I put a 1.2G file into > Input directory, When the process is over,there are only 2M of files in the > output directory.How do I move a complete file? > !input.png!!output.png!!processer.png! > > !putfile.png!!filesize.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5746) The SEND and RECEIVE provenance events for load balancing do not have the same transit uri syntax
[ https://issues.apache.org/jira/browse/NIFI-5746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663406#comment-16663406 ] ASF GitHub Bot commented on NIFI-5746: -- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/3109#discussion_r228061483 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java --- @@ -130,17 +130,14 @@ public void receiveFlowFiles(final Socket socket) throws IOException { final Set certIdentities; try { certIdentities = getCertificateIdentities(sslSession); - -final String dn = CertificateUtils.extractPeerDNFromSSLSocket(socket); -peerDescription = CertificateUtils.extractUsername(dn); } catch (final CertificateException e) { throw new IOException("Failed to extract Client Certificate", e); } logger.debug("Connection received from peer {}. Will perform authorization against Client Identities '{}'", peerDescription, certIdentities); -authorizer.authorize(certIdentities); +peerDescription = authorizer.authorize(certIdentities); --- End diff -- Although the commit message says "Use Node Identifier's node address instead of getting from socket for RECEIVE prov events", we still uses the `nodename` for RECEIVE provenance events [1] that is derived from `socket.getInetAddress().getHostName()` [2]. I wonder if you intended to use this peerDescription instead. Thoughts? 1. https://github.com/apache/nifi/blob/c5e79da4449db81119ab898f15ab7c2aa64b9c91/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java#L343 2. https://github.com/apache/nifi/blob/c5e79da4449db81119ab898f15ab7c2aa64b9c91/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java#L155 > The SEND and RECEIVE provenance events for load balancing do not have the > same transit uri syntax > - > > Key: NIFI-5746 > URL: https://issues.apache.org/jira/browse/NIFI-5746 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.8.0 >Reporter: Mark Payne >Assignee: Mark Payne >Priority: Major > Fix For: 1.9.0 > > > The SEND event has a transit uri like nifi:connection: > The RECEIVE event has a transit uri like nifi:// address>/loadbalance/ > The RECEIVE event is much preferred, as it indicates not only that the > transfer was via load balance but also includes the address of the node and > the UUID of the connection. The SEND Transit URI should be changed to > nifi:///loadbalance/ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #3109: NIFI-5746: Use Node Identifier's node address inste...
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/3109#discussion_r228061483 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java --- @@ -130,17 +130,14 @@ public void receiveFlowFiles(final Socket socket) throws IOException { final Set certIdentities; try { certIdentities = getCertificateIdentities(sslSession); - -final String dn = CertificateUtils.extractPeerDNFromSSLSocket(socket); -peerDescription = CertificateUtils.extractUsername(dn); } catch (final CertificateException e) { throw new IOException("Failed to extract Client Certificate", e); } logger.debug("Connection received from peer {}. Will perform authorization against Client Identities '{}'", peerDescription, certIdentities); -authorizer.authorize(certIdentities); +peerDescription = authorizer.authorize(certIdentities); --- End diff -- Although the commit message says "Use Node Identifier's node address instead of getting from socket for RECEIVE prov events", we still uses the `nodename` for RECEIVE provenance events [1] that is derived from `socket.getInetAddress().getHostName()` [2]. I wonder if you intended to use this peerDescription instead. Thoughts? 1. https://github.com/apache/nifi/blob/c5e79da4449db81119ab898f15ab7c2aa64b9c91/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java#L343 2. https://github.com/apache/nifi/blob/c5e79da4449db81119ab898f15ab7c2aa64b9c91/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java#L155 ---
[jira] [Updated] (NIFI-5751) Use the GetFile Processor can't get a complete file
[ https://issues.apache.org/jira/browse/NIFI-5751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] weijia.hao updated NIFI-5751: - Summary: Use the GetFile Processor can't get a complete file (was: User the GetFile Processor can't get a complete file) > Use the GetFile Processor can't get a complete file > --- > > Key: NIFI-5751 > URL: https://issues.apache.org/jira/browse/NIFI-5751 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 > Environment: centos7 >Reporter: weijia.hao >Priority: Major > Attachments: filesize.png, input.png, output.png, processer.png, > putfile.png > > > I use getFile and putFile processor to realize mv function. All > configurations are default. when I mv a file to the GetFile processor's Input > directory.This file will be completely moved to the putFile processor's > output Directory. But when I cop a file to processor's Input Directory > slowly,this file will be cut into pieces . for example,I put a 1.2G file into > Input directory, When the process is over,there are only 2M of files in the > output directory.How do I move a complete file? > !input.png!!output.png!!processer.png! > > !putfile.png!!filesize.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (NIFI-5751) User the GetFile Processor can't get a complete file
[ https://issues.apache.org/jira/browse/NIFI-5751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] weijia.hao updated NIFI-5751: - Attachment: filesize.png > User the GetFile Processor can't get a complete file > > > Key: NIFI-5751 > URL: https://issues.apache.org/jira/browse/NIFI-5751 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 > Environment: centos7 >Reporter: weijia.hao >Priority: Major > Attachments: filesize.png, input.png, output.png, processer.png, > putfile.png > > > I use getFile and putFile processor to realize mv function. All > configurations are default. when I mv a file to the GetFile processor's Input > directory.This file will be completely moved to the putFile processor's > output Directory. But when I cop a file to processor's Input Directory > slowly,this file will be cut into pieces . for example,I put a 1.2G file into > Input directory, When the process is over,there are only 2M of files in the > output directory.How do I move a complete file? > !input.png!!output.png!!processer.png!!putfile.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (NIFI-5751) User the GetFile Processor can't get a complete file
[ https://issues.apache.org/jira/browse/NIFI-5751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] weijia.hao updated NIFI-5751: - Description: I use getFile and putFile processor to realize mv function. All configurations are default. when I mv a file to the GetFile processor's Input directory.This file will be completely moved to the putFile processor's output Directory. But when I cop a file to processor's Input Directory slowly,this file will be cut into pieces . for example,I put a 1.2G file into Input directory, When the process is over,there are only 2M of files in the output directory.How do I move a complete file? !input.png!!output.png!!processer.png! !putfile.png!!filesize.png! was:I use getFile and putFile processor to realize mv function. All configurations are default. when I mv a file to the GetFile processor's Input directory.This file will be completely moved to the putFile processor's output Directory. But when I cop a file to processor's Input Directory slowly,this file will be cut into pieces . for example,I put a 1.2G file into Input directory, When the process is over,there are only 2M of files in the output directory.How do I move a complete file? !input.png!!output.png!!processer.png!!putfile.png! > User the GetFile Processor can't get a complete file > > > Key: NIFI-5751 > URL: https://issues.apache.org/jira/browse/NIFI-5751 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 > Environment: centos7 >Reporter: weijia.hao >Priority: Major > Attachments: filesize.png, input.png, output.png, processer.png, > putfile.png > > > I use getFile and putFile processor to realize mv function. All > configurations are default. when I mv a file to the GetFile processor's Input > directory.This file will be completely moved to the putFile processor's > output Directory. But when I cop a file to processor's Input Directory > slowly,this file will be cut into pieces . for example,I put a 1.2G file into > Input directory, When the process is over,there are only 2M of files in the > output directory.How do I move a complete file? > !input.png!!output.png!!processer.png! > > !putfile.png!!filesize.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (NIFI-5751) User the GetFile Processor can't get a complete file
[ https://issues.apache.org/jira/browse/NIFI-5751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] weijia.hao updated NIFI-5751: - Description: I use getFile and putFile processor to realize mv function. All configurations are default. when I mv a file to the GetFile processor's Input directory.This file will be completely moved to the putFile processor's output Directory. But when I cop a file to processor's Input Directory slowly,this file will be cut into pieces . for example,I put a 1.2G file into Input directory, When the process is over,there are only 2M of files in the output directory.How do I move a complete file? !input.png!!output.png!!processer.png!!putfile.png! (was: I use getFile and putFile processor to realize mv function. All configurations are default. when I mv a file to the GetFile processor's Input directory.This file will be completely moved to the putFile processor's output Directory. But when I cop a file to processor's Input Directory slowly,this file will be cut into pieces . for example,I put a 1.2G file into Input directory, When the process is over,there are only 2M of files in the output directory.How do I move a complete file?) > User the GetFile Processor can't get a complete file > > > Key: NIFI-5751 > URL: https://issues.apache.org/jira/browse/NIFI-5751 > Project: Apache NiFi > Issue Type: Bug >Affects Versions: 1.7.1 > Environment: centos7 >Reporter: weijia.hao >Priority: Major > Attachments: input.png, output.png, processer.png, putfile.png > > > I use getFile and putFile processor to realize mv function. All > configurations are default. when I mv a file to the GetFile processor's Input > directory.This file will be completely moved to the putFile processor's > output Directory. But when I cop a file to processor's Input Directory > slowly,this file will be cut into pieces . for example,I put a 1.2G file into > Input directory, When the process is over,there are only 2M of files in the > output directory.How do I move a complete file? > !input.png!!output.png!!processer.png!!putfile.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (NIFI-5751) User the GetFile Processor can't get a complete file
weijia.hao created NIFI-5751: Summary: User the GetFile Processor can't get a complete file Key: NIFI-5751 URL: https://issues.apache.org/jira/browse/NIFI-5751 Project: Apache NiFi Issue Type: Bug Affects Versions: 1.7.1 Environment: centos7 Reporter: weijia.hao Attachments: input.png, output.png, processer.png, putfile.png I use getFile and putFile processor to realize mv function. All configurations are default. when I mv a file to the GetFile processor's Input directory.This file will be completely moved to the putFile processor's output Directory. But when I cop a file to processor's Input Directory slowly,this file will be cut into pieces . for example,I put a 1.2G file into Input directory, When the process is over,there are only 2M of files in the output directory.How do I move a complete file? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (NIFI-5750) Tests shouldn't rely on localized data
[ https://issues.apache.org/jira/browse/NIFI-5750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arpad Boda updated NIFI-5750: - Summary: Tests shouldn't rely on localized data (was: Tests shouldn't rely on localised data) > Tests shouldn't rely on localized data > -- > > Key: NIFI-5750 > URL: https://issues.apache.org/jira/browse/NIFI-5750 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework >Affects Versions: 1.8.0 > Environment: Ubuntu 18.04.1 LTS Hungarian >Reporter: Arpad Boda >Priority: Major > > The system I used to test is Hungarian, which made NiFi tests fail due to > localised data: > _[ERROR] Tests run: 13, Failures: 6, Errors: 0, Skipped: 0, Time elapsed: > 65.906 s <<< FAILURE! - in > org.apache.nifi.processors.poi.ConvertExcelToCSVProcessorTest_ > _[ERROR] > testSkipRows(org.apache.nifi.processors.poi.ConvertExcelToCSVProcessorTest) > Time elapsed: 0.727 s <<< FAILURE!_ > _org.junit.ComparisonFailure:_ > _expected:<1234[.46,12:00:00 PM,£ 123.45_ > _1234.5,*+Sunday\, January+* 01\, 2017,¥ 123.45_ > _1\,234.46,1/1/17 12:00,$ 1\,023.45_ > _1\,234.4560,12:00 PM,£ 1\,023.45_ > _9.88E+08,2017/01/01/ 12:00,¥ 1\,023.45_ > _9.877E+08,,_ > _9.]8765E+08,,_ > _> but was:<1234[\,46,12:00:00 DU,£ 123\,45_ > _1234\,5,+*vasárnap\, január*+ 01\, 2017,¥ 123\,45_ > _1 234\,46,1/1/17 12:00,$ 1 023\,45_ > _1 234\,4560,12:00 DU,£ 1 023\,45_ > _9\,88E+08,2017/01/01/ 12:00,¥ 1 023\,45_ > _9\,877E+08,,_ > _9\,]8765E+08,,_ > _>_ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (NIFI-5750) Tests shouldn't rely on localised data
Arpad Boda created NIFI-5750: Summary: Tests shouldn't rely on localised data Key: NIFI-5750 URL: https://issues.apache.org/jira/browse/NIFI-5750 Project: Apache NiFi Issue Type: Bug Components: Core Framework Affects Versions: 1.8.0 Environment: Ubuntu 18.04.1 LTS Hungarian Reporter: Arpad Boda The system I used to test is Hungarian, which made NiFi tests fail due to localised data: _[ERROR] Tests run: 13, Failures: 6, Errors: 0, Skipped: 0, Time elapsed: 65.906 s <<< FAILURE! - in org.apache.nifi.processors.poi.ConvertExcelToCSVProcessorTest_ _[ERROR] testSkipRows(org.apache.nifi.processors.poi.ConvertExcelToCSVProcessorTest) Time elapsed: 0.727 s <<< FAILURE!_ _org.junit.ComparisonFailure:_ _expected:<1234[.46,12:00:00 PM,£ 123.45_ _1234.5,*+Sunday\, January+* 01\, 2017,¥ 123.45_ _1\,234.46,1/1/17 12:00,$ 1\,023.45_ _1\,234.4560,12:00 PM,£ 1\,023.45_ _9.88E+08,2017/01/01/ 12:00,¥ 1\,023.45_ _9.877E+08,,_ _9.]8765E+08,,_ _> but was:<1234[\,46,12:00:00 DU,£ 123\,45_ _1234\,5,+*vasárnap\, január*+ 01\, 2017,¥ 123\,45_ _1 234\,46,1/1/17 12:00,$ 1 023\,45_ _1 234\,4560,12:00 DU,£ 1 023\,45_ _9\,88E+08,2017/01/01/ 12:00,¥ 1 023\,45_ _9\,877E+08,,_ _9\,]8765E+08,,_ _>_ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (NIFI-5749) NiFi requirements include "Java 8 or later", but doesn't compile with Java 9
Arpad Boda created NIFI-5749: Summary: NiFi requirements include "Java 8 or later", but doesn't compile with Java 9 Key: NIFI-5749 URL: https://issues.apache.org/jira/browse/NIFI-5749 Project: Apache NiFi Issue Type: Bug Components: Tools and Build Affects Versions: 1.8.0 Environment: Ubuntu 18.04.1 LTS, AMD64 Reporter: Arpad Boda README.md file attached to NiFi release 1.8.0 contains the following: _#__# Requirements_ _JDK 1.8 or newer_ However NiFi doesn't compile with Java 9: _[ERROR] Failed to execute goal org.asciidoctor:asciidoctor-maven-plugin:1.5.2:process-asciidoc (output-html) on project nifi-docs: Execution output-html of goal org.asciidoctor:asciidoctor-maven-plugin:1.5.2:process-asciidoc failed: (LoadError) load error: jruby/java/java_ext/java.lang -- java.lang.reflect.InaccessibleObjectException: Cannot make a non-public member of class java.lang.reflect.AccessibleObject accessible -> [Help 1]_ Either the content of readme or the compilation error should be fixed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)