[GitHub] nifi pull request #563: NIFI-2078, 2363, 2364: External state management. CL...

2016-07-31 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/563#discussion_r72900441
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml ---
@@ -35,6 +35,10 @@
 nifi-utils
 
 
+org.apache.nifi
+nifi-expression-language
--- End diff --

We may be able to improve the framework to provide onPropertyModified a 
PropertyValue which is already setup. I think valuable registry can be useful 
with env configs like broker address. Raised an issue 
[NIFI-2364](https://issues.apache.org/jira/browse/NIFI-2364).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #563: NIFI-2078, 2363, 2364: External state management. CL...

2016-07-31 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/563#discussion_r72899732
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
 ---
@@ -481,4 +493,59 @@ private void releaseFlowFile(FlowFile flowFile, 
ProcessSession session, Map partitionOffsets = 
KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId);
+
+return new StandardStateMap(partitionOffsets, 
System.currentTimeMillis());
+}
+
+private boolean isReadyToAccessState() {
+return !StringUtils.isEmpty(zookeeperConnectionString)
+&& !StringUtils.isEmpty(topic)
+&& !StringUtils.isEmpty(groupId);
+}
+
+@Override
+public void clearExternalState() throws IOException {
+if (!isReadyToAccessState()) {
+return;
+}
+// Block onTrigger starts creating new consumer until clear offset 
finishes.
+synchronized (this.consumerStreamsReady) {
+KafkaUtils.clearPartitionOffsets(zookeeperConnectionString, 
topic, groupId);
+}
+}
+
+/**
+ * GetKafka overrides this method in order to capture processor's 
property values required when it retrieves
+ * its state managed externally at Kafka. Since view/clear state 
operation can be executed before onTrigger() is called,
+ * we need to capture these values as it's modified. This method is 
also called when NiFi restarts and loads configs,
+ * so users can access external states right after restart of NiFi.
+ * @param descriptor of the modified property
+ * @param oldValue non-null property value (previous)
+ * @param newValue the new property value or if null indicates the 
property
+ */
+@Override
+public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
+if (ZOOKEEPER_CONNECTION_STRING.equals(descriptor)) {
+zookeeperConnectionString = newValue;
+} else if (TOPIC.equals(descriptor)) {
+topic = newValue;
+} else if (GROUP_ID.equals(descriptor)) {
+groupId = newValue;
--- End diff --

Thanks for pointing this out. Addressed.
Fixed bootstrap server address for ConsumeKafka, too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #563: NIFI-2078, 2363, 2364: External state management. CL...

2016-07-29 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/563#discussion_r72834762
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
 ---
@@ -481,4 +493,59 @@ private void releaseFlowFile(FlowFile flowFile, 
ProcessSession session, Map partitionOffsets = 
KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId);
+
+return new StandardStateMap(partitionOffsets, 
System.currentTimeMillis());
+}
+
+private boolean isReadyToAccessState() {
+return !StringUtils.isEmpty(zookeeperConnectionString)
+&& !StringUtils.isEmpty(topic)
+&& !StringUtils.isEmpty(groupId);
+}
+
+@Override
+public void clearExternalState() throws IOException {
+if (!isReadyToAccessState()) {
+return;
+}
+// Block onTrigger starts creating new consumer until clear offset 
finishes.
+synchronized (this.consumerStreamsReady) {
+KafkaUtils.clearPartitionOffsets(zookeeperConnectionString, 
topic, groupId);
+}
+}
+
+/**
+ * GetKafka overrides this method in order to capture processor's 
property values required when it retrieves
+ * its state managed externally at Kafka. Since view/clear state 
operation can be executed before onTrigger() is called,
+ * we need to capture these values as it's modified. This method is 
also called when NiFi restarts and loads configs,
+ * so users can access external states right after restart of NiFi.
+ * @param descriptor of the modified property
+ * @param oldValue non-null property value (previous)
+ * @param newValue the new property value or if null indicates the 
property
+ */
+@Override
+public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
+if (ZOOKEEPER_CONNECTION_STRING.equals(descriptor)) {
+zookeeperConnectionString = newValue;
+} else if (TOPIC.equals(descriptor)) {
+topic = newValue;
+} else if (GROUP_ID.equals(descriptor)) {
+groupId = newValue;
--- End diff --

Probably the easiest way to address this is to set groupId when the default 
gets created for the property.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #563: NIFI-2078, 2363, 2364: External state management. CL...

2016-07-29 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/563#discussion_r72829933
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
 ---
@@ -481,4 +493,59 @@ private void releaseFlowFile(FlowFile flowFile, 
ProcessSession session, Map partitionOffsets = 
KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId);
+
+return new StandardStateMap(partitionOffsets, 
System.currentTimeMillis());
+}
+
+private boolean isReadyToAccessState() {
+return !StringUtils.isEmpty(zookeeperConnectionString)
+&& !StringUtils.isEmpty(topic)
+&& !StringUtils.isEmpty(groupId);
+}
+
+@Override
+public void clearExternalState() throws IOException {
+if (!isReadyToAccessState()) {
+return;
+}
+// Block onTrigger starts creating new consumer until clear offset 
finishes.
+synchronized (this.consumerStreamsReady) {
+KafkaUtils.clearPartitionOffsets(zookeeperConnectionString, 
topic, groupId);
+}
+}
+
+/**
+ * GetKafka overrides this method in order to capture processor's 
property values required when it retrieves
+ * its state managed externally at Kafka. Since view/clear state 
operation can be executed before onTrigger() is called,
+ * we need to capture these values as it's modified. This method is 
also called when NiFi restarts and loads configs,
+ * so users can access external states right after restart of NiFi.
+ * @param descriptor of the modified property
+ * @param oldValue non-null property value (previous)
+ * @param newValue the new property value or if null indicates the 
property
+ */
+@Override
+public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
+if (ZOOKEEPER_CONNECTION_STRING.equals(descriptor)) {
+zookeeperConnectionString = newValue;
+} else if (TOPIC.equals(descriptor)) {
+topic = newValue;
+} else if (GROUP_ID.equals(descriptor)) {
+groupId = newValue;
--- End diff --

I ran into an error due to how GroupId is getting set. This only gets 
called if someone modifies the groupid but it has a default value. So it will 
be null if the user never modifies it and they will hit this error:

2016-07-29 13:24:12,283 WARN [Timer-Driven Process Thread-6] 
o.apache.nifi.processors.kafka.GetKafka 
GetKafka[id=3798f2c6-0156-1000--138083b7] Processor Administratively 
Yielded for 1 sec due to processing failure
2016-07-29 13:24:12,283 WARN [Timer-Driven Process Thread-6] 
o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding 
GetKafka[id=3798f2c6-0156-1000--138083b7] due to uncaught Exception: 
java.lang.IllegalStateException: java.util.concurrent.ExecutionException: 
java.lang.NullPointerException
2016-07-29 13:24:12,284 WARN [Timer-Driven Process Thread-6] 
o.a.n.c.t.ContinuallyRunProcessorTask 
java.lang.IllegalStateException: java.util.concurrent.ExecutionException: 
java.lang.NullPointerException
at 
org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:367) ~[na:na]
at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
 ~[nifi-api-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1060)
 ~[nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
 [nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
 [nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
 [nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[na:1.8.0_74]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
[na:1.8.0_74]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 [na:1.8.0_74]
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 [na:1.8.0_74]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[na:1.8.0_74]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[na:1.8.0_74]
 

[GitHub] nifi pull request #563: NIFI-2078, 2363, 2364: External state management. CL...

2016-07-28 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/563#discussion_r72577880
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
 ---
@@ -54,7 +72,12 @@
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @CapabilityDescription("Consumes messages from Apache Kafka")
 @Tags({ "Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume" 
})
-public class ConsumeKafka extends AbstractKafkaProcessor> {
+@Stateful(scopes = {Scope.EXTERNAL}, description = "After consuming 
messages, ConsumeKafka commits its offset information to Kafka" +
+" so that the state of a consumer group can be retained across 
events such as consumer reconnect." +
+" Offsets can be cleared when there is no consumer subscribing 
with the same consumer group id." +
+" It may take more than 30 seconds for a consumer group to become 
able to be cleared after it is stopped from NiFi." +
+" Once offsets are cleared, ConsumeKafka will resume consuming 
messages based on Offset Reset configuration.")
+public class ConsumeKafka extends AbstractKafkaProcessor> implements ExternalStateManager {
--- End diff --

Added documentation in Developer Guide.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #563: NIFI-2078, 2363, 2364: External state management. CL...

2016-07-27 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/563#discussion_r72562357
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
 ---
@@ -91,7 +92,7 @@ public ClusterProtocolHeartbeatMonitor(final 
ClusterCoordinator clusterCoordinat
 this.clusterNodesPath = 
zkClientConfig.resolvePath("cluster/nodes");
 
 String hostname = 
properties.getProperty(NiFiProperties.CLUSTER_NODE_ADDRESS);
-if (hostname == null) {
+if (StringUtils.isEmpty(hostname)) {
--- End diff --

Thanks for pointing this out. I will remove these changes from this PR. 
Posted my comments on #688.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #563: NIFI-2078, 2363, 2364: External state management. CL...

2016-07-27 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/563#discussion_r72559943
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml ---
@@ -35,6 +35,10 @@
 nifi-utils
 
 
+org.apache.nifi
+nifi-expression-language
--- End diff --

BOOTSTRAP_SERVERS, TOPIC, CLIENT_ID are the properties shared among 
Consume/ProduceKafka. If we can separate and let Consume, ProduceKafka have 
those as its own property, ProduceKafka to support EL but not ConsumeKafka, 
then the dependency can be removed.

But existing flow configuration may have to be updated if one uses EL for 
those properties of ConsumeKafka. 

@olegz @JPercivall How do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #563: NIFI-2078, 2363, 2364: External state management. CL...

2016-07-27 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/563#discussion_r72559265
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml ---
@@ -35,6 +35,10 @@
 nifi-utils
 
 
+org.apache.nifi
+nifi-expression-language
--- End diff --

I added EL dependency to use StandardPropertyValue class at 
ConsumeKafka.onPropertyModified.

Since ConsumeKafka allows EL for properties that needed to get state from 
Kafka, EL has to be evaluated at onPropertyModified, too. But to do so, we need 
PropertyValue instance, which is not available at onPropertyModified because it 
only receives String representation of oldValue and newValue. An implementation 
class of PropertyValue is not included in nifi-api. So, I needed to add the EL 
dependency.

However, I felt it's a little bit strange to support EL for things like 
'topic' or 'bootstrap_servers', since ConsumeKafka doesn't take input flow 
files, and these properties can not be changed after ConsumerKafka connects to 
Kafka. It keep using the same kafkaResource instance. So, those property values 
are more static, rather than dynamically evaluated. It makes sense to support 
EL for those values for PublishKafka, though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #563: NIFI-2078, 2363, 2364: External state management. CL...

2016-07-27 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/563#discussion_r72557366
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ClearComponentStateEndpointMerger.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.entity.ClearComponentStateResultEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+public class ClearComponentStateEndpointMerger extends 
AbstractSingleEntityEndpoint {
+public static final Pattern PROCESSOR_STATE_URI_PATTERN = 
Pattern.compile("/nifi-api/processors/[a-f0-9\\-]{36}/state/clear-requests");
+public static final Pattern CONTROLLER_SERVICE_STATE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller-services/[a-f0-9\\-]{36}/state/clear-requests");
+public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = 
Pattern.compile("/nifi-api/reporting-tasks/[a-f0-9\\-]{36}/state/clear-requests");
--- End diff --

Yes, that's how it's done today. There may be some way to get URI patterns 
align with corresponding Resource class methods, though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #563: NIFI-2078, 2363, 2364: External state management. CL...

2016-07-26 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/563#discussion_r72334765
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml ---
@@ -35,6 +35,10 @@
 nifi-utils
 
 
+org.apache.nifi
+nifi-expression-language
--- End diff --

This module didn't already have access to EL?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #563: NIFI-2078, 2363, 2364: External state management. CL...

2016-07-26 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/563#discussion_r72330212
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
 ---
@@ -54,7 +72,12 @@
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @CapabilityDescription("Consumes messages from Apache Kafka")
 @Tags({ "Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume" 
})
-public class ConsumeKafka extends AbstractKafkaProcessor> {
+@Stateful(scopes = {Scope.EXTERNAL}, description = "After consuming 
messages, ConsumeKafka commits its offset information to Kafka" +
+" so that the state of a consumer group can be retained across 
events such as consumer reconnect." +
+" Offsets can be cleared when there is no consumer subscribing 
with the same consumer group id." +
+" It may take more than 30 seconds for a consumer group to become 
able to be cleared after it is stopped from NiFi." +
+" Once offsets are cleared, ConsumeKafka will resume consuming 
messages based on Offset Reset configuration.")
+public class ConsumeKafka extends AbstractKafkaProcessor> implements ExternalStateManager {
--- End diff --

Since "ExternalStateManager" is something that the developer will need to 
implement if they will use it, this should be documented in the Developer guide


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #563: NIFI-2078, 2363, 2364: External state management. CL...

2016-07-26 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/563#discussion_r72326199
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/ClearComponentStateEndpointMerger.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.http.endpoints;
+
+import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.web.api.entity.ClearComponentStateResultEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+public class ClearComponentStateEndpointMerger extends 
AbstractSingleEntityEndpoint {
+public static final Pattern PROCESSOR_STATE_URI_PATTERN = 
Pattern.compile("/nifi-api/processors/[a-f0-9\\-]{36}/state/clear-requests");
+public static final Pattern CONTROLLER_SERVICE_STATE_URI_PATTERN = 
Pattern.compile("/nifi-api/controller-services/[a-f0-9\\-]{36}/state/clear-requests");
+public static final Pattern REPORTING_TASK_STATE_URI_PATTERN = 
Pattern.compile("/nifi-api/reporting-tasks/[a-f0-9\\-]{36}/state/clear-requests");
--- End diff --

Ah I see that's how it's done in all the classes in the same package


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #563: NIFI-2078, 2363, 2364: External state management. CL...

2016-07-26 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/563#discussion_r72325932
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.java
 ---
@@ -91,7 +92,7 @@ public ClusterProtocolHeartbeatMonitor(final 
ClusterCoordinator clusterCoordinat
 this.clusterNodesPath = 
zkClientConfig.resolvePath("cluster/nodes");
 
 String hostname = 
properties.getProperty(NiFiProperties.CLUSTER_NODE_ADDRESS);
-if (hostname == null) {
+if (StringUtils.isEmpty(hostname)) {
--- End diff --

This change will conflict with @markap14 change in this PR: 
https://github.com/apache/nifi/pull/688/files


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #563: NIFI-2078, 2363, 2364: External state management. CL...

2016-07-26 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/563#discussion_r72325489
  
--- Diff: 
nifi-api/src/main/java/org/apache/nifi/components/state/ExternalStateManager.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.state;
+
+import org.apache.nifi.annotation.behavior.Stateful;
+
+import java.io.IOException;
+
+/**
+ * 
+ * The ExternalStateManager is responsible for providing NiFi a mechanism 
for retrieving
+ * and clearing state stored in an external system a NiFi component 
interact with.
+ * 
+ *
+ * 
+ * When calling methods in this class, the state is always 
retrieved/cleared from external system
+ * regardless NiFi instance is a part of a cluster or standalone.
+ * 
+ *
+ * 
+ * This mechanism is designed to allow developers to easily store and 
retrieve small amounts of state.
+ * Since implementation of this interface interacts with remote system, 
one should consider the cost of
+ * retrieving this data, and the amount of data should be kept to the 
minimum required.
--- End diff --

This paragraph doesn't make sense for External state since the user 
probably isn't choosing what to store


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---