[GitHub] nifi pull request #563: NIFI-2078: External state management.

2016-07-06 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/563#discussion_r69829659
  
--- Diff: 
nifi-api/src/main/java/org/apache/nifi/components/state/ExternalStateManager.java
 ---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.components.state;
+
+import org.apache.nifi.annotation.behavior.Stateful;
+
+import java.io.IOException;
+
+/**
+ * 
+ * The ExternalStateManager is responsible for providing NiFi a mechanism 
for retrieving
+ * and clearing state stored in an external system a NiFi component 
interact with.
+ * 
+ *
+ * 
+ * When calling methods in this class, the state is always 
retrieved/cleared from external system
+ * regardless NiFi instance is a part of a cluster or standalone.
+ * 
+ *
+ * 
+ * This mechanism is designed to allow developers to easily store and 
retrieve small amounts of state.
+ * Since implementation of this interface interacts with remote system, 
one should consider the cost of
+ * retrieving this data, and the amount of data should be kept to the 
minimum required.
+ * 
+ *
+ * 
+ * Any component that wishes to implement ExternalStateManager should also 
use the {@link Stateful} annotation
+ * with {@link Scope#EXTERNAL} to provide a description of what state is 
being stored.
+ * If this annotation is not present, the UI will not expose such 
information or allow DFMs to clear the state.
+ * 
+ */
+public interface ExternalStateManager {
+
+/**
+ * Returns the current state for the component. This return value may 
be null.
+ *
+ * @return the current state for the component or null if there is no 
state is retrieved
+ * @throws IOException if unable to communicate with the underlying 
storage mechanism
+ */
+StateMap getState() throws IOException;
+
+/**
+ * Clears all keys and values from the component's state
+ *
+ * @throws IOException if unable to communicate with the underlying 
storage mechanism
+ */
+void clear() throws IOException;
--- End diff --

This should be renamed to "clearState" to match the corresponding 
"getState" and to better identified the method. Since it's an interface you 
never know what else the processor may want to clear and "clear" could end up 
being ambiguous. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #563: NIFI-2078: External state management.

2016-07-06 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/563#discussion_r69829390
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
 ---
@@ -239,4 +300,188 @@ private void releaseFlowFile(FlowFile flowFile, 
ProcessContext context, ProcessS
 this.getLogger().info("Successfully received {} from Kafka with {} 
messages in {} millis", new Object[] { flowFile, msgCount, executionDuration });
 session.transfer(flowFile, REL_SUCCESS);
 }
+
+@Override
+public StateMap getState() throws IOException {
+
+if (!isReadyToAccessState()) {
+return null;
+}
+
+final String groupId = 
kafkaProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+return submitConsumerGroupCommand("Fetch offsets", consumer -> {
+final Map partitionOffsets = 
consumer.partitionsFor(topic).stream()
+.map(p -> new TopicPartition(topic, p.partition()))
+.map(tp -> new ImmutablePair<>(tp, 
consumer.committed(tp)))
+.filter(tpo -> tpo.right != null)
+.collect(Collectors.toMap(tpo ->
+"partition:" + tpo.left.partition(),
+tpo -> String.valueOf(tpo.right.offset(;
+
+logger.info("Retrieved offsets from Kafka, topic={}, 
groupId={}, partitionOffsets={}",
+topic, groupId, partitionOffsets);
+
+return new StandardStateMap(partitionOffsets, 
System.currentTimeMillis());
+}, null);
+}
+
+private boolean isReadyToAccessState() {
+if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(brokers)
+|| kafkaProperties == null || 
StringUtils.isEmpty(kafkaProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG)))
 {
+return false;
+}
+return true;
+}
+
+/**
+ * Clear offsets stored in Kafka, by committing -1 as offset of 
each partitions of specified topic.
+ *
+ * Kafka allows commitSync if one of following conditions are met,
+ * see kafka.coordinator.GroupCoordinator.handleCommitOffsets for 
detail:
+ * 
+ * The consumer is a member of the consumer group. In this case,
+ * even if there's other consumers connecting Kafka, offsets can be 
updated.
+ * It's dangerous to clear offsets if there're active consumers.
+ * When consumer.subscribe() and poll() are called, the consumer will 
be a member of the consumer group.
+ *
+ * There's no connected consumer within the group,
+ * and Kafka GroupCoordinator has marked the group as dead.
+ * It's safer but can take longer.
+ * 
+ *
+ * The consumer group state transition is an async operation at 
Kafka group coordinator.
+ * Although clear() can only be called when the processor is stopped,
+ * the consumer group may not be fully removed at Kafka, in that case, 
CommitFailedException will be thrown.
+ *
+ * Following log msg can be found when GroupCoordinator has marked 
the group as dead
+ * in kafka.out on a Kafka broker server, it can take more than 30 
seconds:
+ * [GroupCoordinator]: Group [gid] generation 1 is dead
+ * and removed (kafka.coordinator.GroupCoordinator)
+ *
+ */
+@Override
+public void clear() throws IOException {
+
+if (!isReadyToAccessState()) {
+return;
+}
+
+final String groupId = 
kafkaProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+final Boolean result = submitConsumerGroupCommand("Clear offsets", 
consumer -> {
--- End diff --

Same comment as on GetKafka, should this block the onTrigger from getting 
new messages?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #563: NIFI-2078: External state management.

2016-07-06 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/563#discussion_r69828743
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
 ---
@@ -166,4 +170,44 @@ public MessageAndMetadata answer(InvocationOnMock 
invocation) throws Throwable {
 }
 }
 
+@Test
+public void testGetState() throws Exception {
+final GetKafka processor = new GetKafka();
+final TestRunner runner = TestRunners.newTestRunner(processor);
+
+assertNull("State should be null when required properties are not 
specified.", processor.getState());
+
+runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, 
"0.0.0.0:invalid-port");
+runner.setProperty(GetKafka.TOPIC, "testX");
+
+assertNull("State should be null when required properties are not 
specified.", processor.getState());
+
+runner.setProperty(GetKafka.GROUP_ID, "consumer-group-id");
+
+try {
+processor.getState();
+fail("The processor should try to access Zookeeper and should 
fail since it can not connect.");
+} catch (IOException e) {
+}
+}
+
+@Test
+public void testClearState() throws Exception {
+final GetKafka processor = new GetKafka();
+final TestRunner runner = TestRunners.newTestRunner(processor);
+
+// Clear doesn't do anything until required properties are set.
+processor.clear();
+
+runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, 
"0.0.0.0:invalid-port");
+runner.setProperty(GetKafka.TOPIC, "testX");
+runner.setProperty(GetKafka.GROUP_ID, "consumer-group-id");
+
+try {
+processor.getState();
--- End diff --

This is a unit test to clearState but calls getState here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #563: NIFI-2078: External state management.

2016-07-06 Thread olegz
Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/563#discussion_r69828024
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
 ---
@@ -481,4 +496,51 @@ private void releaseFlowFile(FlowFile flowFile, 
ProcessSession session, Map partitionOffsets = 
KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId);
+
+return new StandardStateMap(partitionOffsets, 
System.currentTimeMillis());
+}
+
+private boolean isReadyToAccessState() {
+if(StringUtils.isEmpty(zookeeperConnectionString)
+|| StringUtils.isEmpty(topic)
+|| StringUtils.isEmpty(groupId)) {
+return false;
+}
+return true;
--- End diff --

Wouldn't it be simpler to do:
```java
return !StringUtils.isEmpty(zookeeperConnectionString) && 
!StringUtils.isEmpty(topic) && StringUtils.isEmpty(groupId);
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #563: NIFI-2078: External state management.

2016-07-06 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/563#discussion_r69827581
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
 ---
@@ -481,4 +496,51 @@ private void releaseFlowFile(FlowFile flowFile, 
ProcessSession session, Map partitionOffsets = 
KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId);
+
+return new StandardStateMap(partitionOffsets, 
System.currentTimeMillis());
+}
+
+private boolean isReadyToAccessState() {
+if(StringUtils.isEmpty(zookeeperConnectionString)
+|| StringUtils.isEmpty(topic)
+|| StringUtils.isEmpty(groupId)) {
+return false;
+}
+return true;
+}
+
+@Override
+public void clear() throws IOException {
+if (!isReadyToAccessState()) {
+return;
+}
+KafkaUtils.clearPartitionOffsets(zookeeperConnectionString, topic, 
groupId);
--- End diff --

Shouldn't this block the onTrigger from attempting to reach out to Kafka to 
get new messages? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #563: NIFI-2078: External state management.

2016-07-06 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/563#discussion_r69827217
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
 ---
@@ -218,11 +232,13 @@
 }
 
 public void createConsumers(final ProcessContext context) {
-final String topic = context.getProperty(TOPIC).getValue();
+topic = context.getProperty(TOPIC).getValue();
--- End diff --

Why change the three values here? They are set in the "onPropertyModified" 
so whenever it's changed it will readjust. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #563: NIFI-2078: External state management.

2016-07-06 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/563#discussion_r69827142
  
--- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
 ---
@@ -184,6 +195,9 @@
 private volatile long deadlockTimeout;
 
 private volatile ExecutorService executor;
+private String zookeeperConnectionString;
--- End diff --

These three will be accessed by different threads and should be marked 
volatile. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request #563: NIFI-2078: External state management.

2016-06-22 Thread ijokarumawak
GitHub user ijokarumawak opened a pull request:

https://github.com/apache/nifi/pull/563

NIFI-2078: External state management.

- Added ExternalStateManager to handle components' state managed
  externally
- Added UI codes to display external state
- Added view/clear functionality to ConsumeKafka
- Added view/clear functionality to GetKafka
- Capture property value change, so that external state can be accessed
  before onTrigger is called

TODO:
- Test with Kerberized Zookeeper and Kafka
- Fix Component State UI issues
- Fix Expression Language of ConsumeKafka properties

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ijokarumawak/nifi nifi-2078

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/563.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #563


commit bb630fae8ce98c89fbfb4122e57bcf7fe83ba35f
Author: Koji Kawamura 
Date:   2016-06-23T02:29:32Z

NIFI-2078: External state management.

- Added ExternalStateManager to handle components' state managed
  externally
- Added UI codes to display external state
- Added view/clear functionality to ConsumeKafka
- Added view/clear functionality to GetKafka
- Capture property value change, so that external state can be accessed
  before onTrigger is called




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---