pvillard31 opened a new pull request, #10769:
URL: https://github.com/apache/nifi/pull/10769

   # Summary
   
   NIFI-15464 - ConsumeKafka - Commit pending offsets for revoked partitions
   
   ### Problem
   
   The `ConsumeKafka` processor using `Kafka3ConnectionService` causes 
duplicate message processing when a consumer group rebalance occurs (when a 
consumer starts or stops for example). This happens because the 
`Kafka3ConsumerService.onPartitionsRevoked()` callback only performs a rollback 
instead of committing pending offsets before partitions are revoked.
   
   When a rebalance is triggered:
   - Kafka calls `onPartitionsRevoked()` before the consumer loses ownership of 
its partitions
   - The current implementation just rolls back to the last committed offset
   - Any messages that were polled but not yet committed are lost
   - The processor later attempts to commit offsets but receives 
`RebalanceInProgressException`
   - Another consumer re-processes the same messages, causing duplicates
   
   This issue did not occur with the legacy `ConsumeKafka_2_6` processor in 
NiFi 1.x.
   
   ### Root Cause
   
   In NiFi 1.x, the `ConsumerLease` class implemented 
`ConsumerRebalanceListener` and had direct access to both the uncommitted 
offsets (tracked internally) and the `ProcessSession`. When 
`onPartitionsRevoked()` was called, it would commit pending offsets before 
partitions were revoked.
   
   In NiFi 2.x, the architecture changed:
   - `Kafka3ConsumerService` handles Kafka consumer operations and implements 
`ConsumerRebalanceListener`
   - `ConsumeKafka` processor handles session/FlowFile operations and offset 
tracking (via `OffsetTracker`)
   - There was no mechanism for `Kafka3ConsumerService` to commit pending 
offsets during `onPartitionsRevoked()` because it didn't track them
   
   ### Solution
   
   This PR restores the NiFi 1.x behavior by tracking uncommitted offsets 
internally within `Kafka3ConsumerService`:
   - Added a `ConcurrentHashMap<TopicPartition, Long>` to track the maximum 
offset for each partition as records are polled
   - Modified `poll()` to update the tracked offsets for each record consumed
   - Modified `onPartitionsRevoked()` to commit the tracked offsets for revoked 
partitions before they are taken away
   - Modified `commit()` to clear tracked offsets for partitions that have been 
committed
   - Modified `rollback()` to clear tracked offsets for partitions being rolled 
back
   
   This approach mirrors what was done in `ConsumerLease.uncommittedOffsetsMap` 
in NiFi 1.x.
   
   ### Important Note
   
   While this fix restores correct behavior, a cleaner architectural approach 
would be to introduce a callback mechanism where the processor can be notified 
during `onPartitionsRevoked()` and provide the offsets to commit. This would:
   - Maintain proper separation of concerns between the service and processor 
layers
   - Preserve the correct ordering of session commit before Kafka offset commit
   - Give the processor full control over what gets committed during rebalance
   
   This larger refactor could be done as a follow-up iteration if desired. This 
would increase the coupling between processors and the service layer so not 
sure this is an awesome idea.
   
   ### Testing
   
   I spent some time adding integration testing and it works locally. Depending 
on timing and such, it may reveal being flaky... If that's the case, we can 
drop the new IT class completely but let's see...
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [ ] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue 
created
   
   ### Pull Request Tracking
   
   - [ ] Pull Request title starts with Apache NiFi Jira issue number, such as 
`NIFI-00000`
   - [ ] Pull Request commit message starts with Apache NiFi Jira issue number, 
as such `NIFI-00000`
   - [ ] Pull request contains [commits 
signed](https://docs.github.com/en/authentication/managing-commit-signature-verification/signing-commits)
 with a registered key indicating `Verified` status
   
   ### Pull Request Formatting
   
   - [ ] Pull Request based on current revision of the `main` branch
   - [ ] Pull Request refers to a feature branch with one commit containing 
changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request 
creation.
   
   ### Build
   
   - [ ] Build completed using `./mvnw clean install -P contrib-check`
     - [ ] JDK 21
     - [ ] JDK 25
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 
2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License 
Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` 
files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to