Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/563#discussion_r72899732
  
    --- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
 ---
    @@ -481,4 +493,59 @@ private void releaseFlowFile(FlowFile flowFile, 
ProcessSession session, Map<Stri
                 session.transfer(flowFile, REL_SUCCESS);
             }
         }
    +
    +    @Override
    +    public ExternalStateScope getExternalStateScope() {
    +        return ExternalStateScope.CLUSTER;
    +    }
    +
    +    @Override
    +    public StateMap getExternalState() throws IOException {
    +        // We don't have to synchronize with onTrigger here,
    +        // since it merely retrieves state from Zk using different 
channel, it doesn't affect consuming.
    +        if (!isReadyToAccessState()) {
    +            return null;
    +        }
    +        final Map<String, String> partitionOffsets = 
KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId);
    +
    +        return new StandardStateMap(partitionOffsets, 
System.currentTimeMillis());
    +    }
    +
    +    private boolean isReadyToAccessState() {
    +        return !StringUtils.isEmpty(zookeeperConnectionString)
    +                && !StringUtils.isEmpty(topic)
    +                && !StringUtils.isEmpty(groupId);
    +    }
    +
    +    @Override
    +    public void clearExternalState() throws IOException {
    +        if (!isReadyToAccessState()) {
    +            return;
    +        }
    +        // Block onTrigger starts creating new consumer until clear offset 
finishes.
    +        synchronized (this.consumerStreamsReady) {
    +            KafkaUtils.clearPartitionOffsets(zookeeperConnectionString, 
topic, groupId);
    +        }
    +    }
    +
    +    /**
    +     * GetKafka overrides this method in order to capture processor's 
property values required when it retrieves
    +     * its state managed externally at Kafka. Since view/clear state 
operation can be executed before onTrigger() is called,
    +     * we need to capture these values as it's modified. This method is 
also called when NiFi restarts and loads configs,
    +     * so users can access external states right after restart of NiFi.
    +     * @param descriptor of the modified property
    +     * @param oldValue non-null property value (previous)
    +     * @param newValue the new property value or if null indicates the 
property
    +     */
    +    @Override
    +    public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
    +        if (ZOOKEEPER_CONNECTION_STRING.equals(descriptor)) {
    +            zookeeperConnectionString = newValue;
    +        } else if (TOPIC.equals(descriptor)) {
    +            topic = newValue;
    +        } else if (GROUP_ID.equals(descriptor)) {
    +            groupId = newValue;
    --- End diff --
    
    Thanks for pointing this out. Addressed.
    Fixed bootstrap server address for ConsumeKafka, too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to