Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/563#discussion_r69829390
  
    --- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
 ---
    @@ -239,4 +300,188 @@ private void releaseFlowFile(FlowFile flowFile, 
ProcessContext context, ProcessS
             this.getLogger().info("Successfully received {} from Kafka with {} 
messages in {} millis", new Object[] { flowFile, msgCount, executionDuration });
             session.transfer(flowFile, REL_SUCCESS);
         }
    +
    +    @Override
    +    public StateMap getState() throws IOException {
    +
    +        if (!isReadyToAccessState()) {
    +            return null;
    +        }
    +
    +        final String groupId = 
kafkaProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
    +        return submitConsumerGroupCommand("Fetch offsets", consumer -> {
    +            final Map<String, String> partitionOffsets = 
consumer.partitionsFor(topic).stream()
    +                    .map(p -> new TopicPartition(topic, p.partition()))
    +                    .map(tp -> new ImmutablePair<>(tp, 
consumer.committed(tp)))
    +                    .filter(tpo -> tpo.right != null)
    +                    .collect(Collectors.toMap(tpo ->
    +                                    "partition:" + tpo.left.partition(),
    +                            tpo -> String.valueOf(tpo.right.offset())));
    +
    +            logger.info("Retrieved offsets from Kafka, topic={}, 
groupId={}, partitionOffsets={}",
    +                    topic, groupId, partitionOffsets);
    +
    +            return new StandardStateMap(partitionOffsets, 
System.currentTimeMillis());
    +        }, null);
    +    }
    +
    +    private boolean isReadyToAccessState() {
    +        if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(brokers)
    +                || kafkaProperties == null || 
StringUtils.isEmpty(kafkaProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG)))
 {
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    /**
    +     * <p>Clear offsets stored in Kafka, by committing -1 as offset of 
each partitions of specified topic.</p>
    +     *
    +     * <p>Kafka allows commitSync if one of following conditions are met,
    +     * see kafka.coordinator.GroupCoordinator.handleCommitOffsets for 
detail:
    +     * <ol>
    +     * <li>The consumer is a member of the consumer group. In this case,
    +     * even if there's other consumers connecting Kafka, offsets can be 
updated.
    +     * It's dangerous to clear offsets if there're active consumers.
    +     * When consumer.subscribe() and poll() are called, the consumer will 
be a member of the consumer group.</li>
    +     *
    +     * <li>There's no connected consumer within the group,
    +     * and Kafka GroupCoordinator has marked the group as dead.
    +     * It's safer but can take longer.</li>
    +     * </ol>
    +     *
    +     * <p>The consumer group state transition is an async operation at 
Kafka group coordinator.
    +     * Although clear() can only be called when the processor is stopped,
    +     * the consumer group may not be fully removed at Kafka, in that case, 
CommitFailedException will be thrown.</p>
    +     *
    +     * <p>Following log msg can be found when GroupCoordinator has marked 
the group as dead
    +     * in kafka.out on a Kafka broker server, it can take more than 30 
seconds:
    +     * <blockquote>[GroupCoordinator]: Group [gid] generation 1 is dead
    +     * and removed (kafka.coordinator.GroupCoordinator)</blockquote></p>
    +     *
    +     */
    +    @Override
    +    public void clear() throws IOException {
    +
    +        if (!isReadyToAccessState()) {
    +            return;
    +        }
    +
    +        final String groupId = 
kafkaProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
    +        final Boolean result = submitConsumerGroupCommand("Clear offsets", 
consumer -> {
    --- End diff --
    
    Same comment as on GetKafka, should this block the onTrigger from getting 
new messages?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to