lianetm commented on code in PR #17699:
URL: https://github.com/apache/kafka/pull/17699#discussion_r1869882439
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -515,6 +531,89 @@ private void process(final
ShareAcknowledgementCommitCallbackRegistrationEvent e
manager.setAcknowledgementCommitCallbackRegistered(event.isCallbackRegistered());
}
+ private void process(final SeekUnvalidatedEvent event) {
+ try {
+ event.offsetEpoch().ifPresent(epoch ->
metadata.updateLastSeenEpochIfNewer(event.partition(), epoch));
+ SubscriptionState.FetchPosition newPosition = new
SubscriptionState.FetchPosition(
+ event.offset(),
+ event.offsetEpoch(),
+ metadata.currentLeader(event.partition())
+ );
+ subscriptions.seekUnvalidated(event.partition(), newPosition);
+ event.future().complete(null);
+ } catch (Exception e) {
+ event.future().completeExceptionally(e);
+ }
+ }
+
+ private void process(final PausePartitionsEvent event) {
+ try {
+ Collection<TopicPartition> partitions = event.partitions();
+ log.debug("Pausing partitions {}", partitions);
+
+ for (TopicPartition partition : partitions) {
+ subscriptions.pause(partition);
+ }
+
+ event.future().complete(null);
+ } catch (Exception e) {
+ event.future().completeExceptionally(e);
+ }
+ }
+
+ private void process(final ResumePartitionsEvent event) {
+ try {
+ Collection<TopicPartition> partitions = event.partitions();
+ log.debug("Resuming partitions {}", partitions);
+
+ for (TopicPartition partition : partitions) {
+ subscriptions.resume(partition);
+ }
+
+ event.future().complete(null);
+ } catch (Exception e) {
+ event.future().completeExceptionally(e);
+ }
+ }
+
+ private void process(final CurrentLagEvent event) {
Review Comment:
no need, you're right. My concern was that with the move to the background
the tests for the consumer would be mocking this part but it's not the case, so
the existing KafkaConsumerTests that cover lag will cover this. All good
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]