[ 
https://issues.apache.org/jira/browse/NIFI-15961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierre Villard updated NIFI-15961:
----------------------------------
    Description: 
Extend the *ConsumeKafka* processor with optional support for Kafka share 
groups 
([KIP-932|https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka]).
 Share groups distribute records cooperatively across the consumers of a group 
with per-record acknowledgement, decoupling consumer parallelism from the 
number of partitions on the subscribed topics.

*Proposed Change*

Add a new *Group Type* property to *ConsumeKafka* with values *Consumer Group* 
(the default) and {*}Share Group{*}.

When *Share Group* is selected:
 * The processor uses *KafkaShareConsumer* via a new 
*KafkaShareConsumerService* SPI on *KafkaConnectionService* (added as a default 
method that throws *UnsupportedOperationException* so out-of-tree 
implementations stay binary- and source-compatible).
 * A new *Acknowledgement Mode* property ({*}Explicit{*} default, *Implicit* 
opt-in) controls how records are acknowledged. In *Explicit* mode every record 
is acknowledged individually; on session rollback records are {*}RELEASE{*}d 
back to the share group for immediate redelivery. In *Implicit* mode the broker 
treats all delivered records as *ACCEPT* on the next poll/commit; on rollback 
the consumer is closed so the broker's record-acquisition lock can expire and 
the records become eligible for redelivery.
 * Classic-group properties (Topic Format, Auto Offset Reset, Commit Offsets) 
are hidden when *Share Group* is selected because they have no analogue in the 
share-group protocol. The starting position for a new share group is managed 
out of band via *kafka-share-groups.sh --reset-offsets* or 
{*}Admin.alterShareGroupOffsets{*}.
 * Verification and topic sampling continue to use *Explicit* acknowledgement 
internally so sampled records are released back to the share group, regardless 
of the user-selected mode.

  was:
Extend the *ConsumeKafka* processor with optional support for Kafka share 
groups 
([KIP-932|https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka]).
 Share groups distribute records cooperatively across the consumers of a group 
with per-record acknowledgement, decoupling consumer parallelism from the 
number of partitions on the subscribed topics.

*Proposed Change*

Add a new *Group Type* property to *ConsumeKafka* with values *Consumer Group* 
(the default) and {*}Share Group{*}. When *Share Group* is selected:
 * The processor uses *KafkaShareConsumer* via a new 
*KafkaShareConsumerService* SPI on *KafkaConnectionService* (added as a default 
method that throws *UnsupportedOperationException* so out-of-tree 
implementations stay binary- and source-compatible).
 * A new *Acknowledgement Mode* property ({*}Explicit{*} default, *Implicit* 
opt-in) controls how records are acknowledged. In *Explicit* mode every record 
is acknowledged individually; on session rollback records are {*}RELEASE{*}d 
back to the share group for immediate redelivery. In *Implicit* mode the broker 
treats all delivered records as *ACCEPT* on the next poll/commit; on rollback 
the consumer is closed so the broker's record-acquisition lock can expire and 
the records become eligible for redelivery.
 * Classic-group properties (Topic Format, Auto Offset Reset, Commit Offsets) 
are hidden when *Share Group* is selected because they have no analogue in the 
share-group protocol. The starting position for a new share group is managed 
out of band via *kafka-share-groups.sh --reset-offsets* or 
{*}Admin.alterShareGroupOffsets{*}.
 * Verification and topic sampling continue to use *Explicit* acknowledgement 
internally so sampled records are released back to the share group, regardless 
of the user-selected mode.


> Add Kafka Share Group support to ConsumeKafka
> ---------------------------------------------
>
>                 Key: NIFI-15961
>                 URL: https://issues.apache.org/jira/browse/NIFI-15961
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Pierre Villard
>            Assignee: Pierre Villard
>            Priority: Major
>
> Extend the *ConsumeKafka* processor with optional support for Kafka share 
> groups 
> ([KIP-932|https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka]).
>  Share groups distribute records cooperatively across the consumers of a 
> group with per-record acknowledgement, decoupling consumer parallelism from 
> the number of partitions on the subscribed topics.
> *Proposed Change*
> Add a new *Group Type* property to *ConsumeKafka* with values *Consumer 
> Group* (the default) and {*}Share Group{*}.
> When *Share Group* is selected:
>  * The processor uses *KafkaShareConsumer* via a new 
> *KafkaShareConsumerService* SPI on *KafkaConnectionService* (added as a 
> default method that throws *UnsupportedOperationException* so out-of-tree 
> implementations stay binary- and source-compatible).
>  * A new *Acknowledgement Mode* property ({*}Explicit{*} default, *Implicit* 
> opt-in) controls how records are acknowledged. In *Explicit* mode every 
> record is acknowledged individually; on session rollback records are 
> {*}RELEASE{*}d back to the share group for immediate redelivery. In 
> *Implicit* mode the broker treats all delivered records as *ACCEPT* on the 
> next poll/commit; on rollback the consumer is closed so the broker's 
> record-acquisition lock can expire and the records become eligible for 
> redelivery.
>  * Classic-group properties (Topic Format, Auto Offset Reset, Commit Offsets) 
> are hidden when *Share Group* is selected because they have no analogue in 
> the share-group protocol. The starting position for a new share group is 
> managed out of band via *kafka-share-groups.sh --reset-offsets* or 
> {*}Admin.alterShareGroupOffsets{*}.
>  * Verification and topic sampling continue to use *Explicit* acknowledgement 
> internally so sampled records are released back to the share group, 
> regardless of the user-selected mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to