[ 
https://issues.apache.org/jira/browse/NIFI-15961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierre Villard updated NIFI-15961:
----------------------------------
    Status: Patch Available  (was: Open)

> Add Kafka Share Group support to ConsumeKafka
> ---------------------------------------------
>
>                 Key: NIFI-15961
>                 URL: https://issues.apache.org/jira/browse/NIFI-15961
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Pierre Villard
>            Assignee: Pierre Villard
>            Priority: Major
>
> Extend the *ConsumeKafka* processor with optional support for Kafka share 
> groups 
> ([KIP-932|https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka]).
>  Share groups distribute records cooperatively across the consumers of a 
> group with per-record acknowledgement, decoupling consumer parallelism from 
> the number of partitions on the subscribed topics.
> *Proposed Change*
> Add a new *Group Type* property to *ConsumeKafka* with values *Consumer 
> Group* (the default) and {*}Share Group{*}.
> When *Share Group* is selected:
>  * The processor uses *KafkaShareConsumer* via a new 
> *KafkaShareConsumerService* SPI on *KafkaConnectionService* (added as a 
> default method that throws *UnsupportedOperationException* so out-of-tree 
> implementations stay binary- and source-compatible).
>  * A new *Acknowledgement Mode* property ({*}Explicit{*} default, *Implicit* 
> opt-in) controls how records are acknowledged. In *Explicit* mode every 
> record is acknowledged individually; on session rollback records are 
> {*}RELEASE{*}d back to the share group for immediate redelivery. In 
> *Implicit* mode the broker treats all delivered records as *ACCEPT* on the 
> next poll/commit; on rollback the consumer is closed so the broker's 
> record-acquisition lock can expire and the records become eligible for 
> redelivery.
>  * Classic-group properties (Topic Format, Auto Offset Reset, Commit Offsets) 
> are hidden when *Share Group* is selected because they have no analogue in 
> the share-group protocol. The starting position for a new share group is 
> managed out of band via *kafka-share-groups.sh --reset-offsets* or 
> {*}Admin.alterShareGroupOffsets{*}.
>  * Verification and topic sampling continue to use *Explicit* acknowledgement 
> internally so sampled records are released back to the share group, 
> regardless of the user-selected mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to