Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r72899732 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java --- @@ -481,4 +493,59 @@ private void releaseFlowFile(FlowFile flowFile, ProcessSession session, Map<Stri session.transfer(flowFile, REL_SUCCESS); } } + + @Override + public ExternalStateScope getExternalStateScope() { + return ExternalStateScope.CLUSTER; + } + + @Override + public StateMap getExternalState() throws IOException { + // We don't have to synchronize with onTrigger here, + // since it merely retrieves state from Zk using different channel, it doesn't affect consuming. + if (!isReadyToAccessState()) { + return null; + } + final Map<String, String> partitionOffsets = KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId); + + return new StandardStateMap(partitionOffsets, System.currentTimeMillis()); + } + + private boolean isReadyToAccessState() { + return !StringUtils.isEmpty(zookeeperConnectionString) + && !StringUtils.isEmpty(topic) + && !StringUtils.isEmpty(groupId); + } + + @Override + public void clearExternalState() throws IOException { + if (!isReadyToAccessState()) { + return; + } + // Block onTrigger starts creating new consumer until clear offset finishes. + synchronized (this.consumerStreamsReady) { + KafkaUtils.clearPartitionOffsets(zookeeperConnectionString, topic, groupId); + } + } + + /** + * GetKafka overrides this method in order to capture processor's property values required when it retrieves + * its state managed externally at Kafka. Since view/clear state operation can be executed before onTrigger() is called, + * we need to capture these values as it's modified. This method is also called when NiFi restarts and loads configs, + * so users can access external states right after restart of NiFi. + * @param descriptor of the modified property + * @param oldValue non-null property value (previous) + * @param newValue the new property value or if null indicates the property + */ + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + if (ZOOKEEPER_CONNECTION_STRING.equals(descriptor)) { + zookeeperConnectionString = newValue; + } else if (TOPIC.equals(descriptor)) { + topic = newValue; + } else if (GROUP_ID.equals(descriptor)) { + groupId = newValue; --- End diff -- Thanks for pointing this out. Addressed. Fixed bootstrap server address for ConsumeKafka, too.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---