jsancio commented on code in PR #16626:
URL: https://github.com/apache/kafka/pull/16626#discussion_r1684684173
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -348,22 +360,29 @@ private void onUpdateLeaderHighWatermark(
private void updateListenersProgress(long highWatermark) {
for (ListenerContext listenerContext : listenerContexts.values()) {
listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset
-> {
- // Send snapshot to the listener, if the listener is at the
beginning of the log and there is a snapshot,
- // or the listener is trying to read an offset for which there
isn't a segment in the log.
+ // Send snapshot to the listener, if there is a snapshot for
the partition,
+ // and it is a new listener or
+ // the listener is trying to read an offset for which there
isn't a segment in the log.
if (nextExpectedOffset < highWatermark &&
- ((nextExpectedOffset == 0 && latestSnapshot().isPresent())
||
- nextExpectedOffset < log.startOffset())
+ ((nextExpectedOffset ==
ListenerContext.STARTING_NEXT_OFFSET || nextExpectedOffset < log.startOffset())
+ && latestSnapshot().isPresent())
) {
- SnapshotReader<T> snapshot =
latestSnapshot().orElseThrow(() -> new IllegalStateException(
+ listenerContext.fireHandleSnapshot(latestSnapshot().get());
+ } else if (nextExpectedOffset ==
ListenerContext.STARTING_NEXT_OFFSET) {
+ // Reset the next offset to 0 since it is a new listener
context and there is no
+ // bootstraping checkpoint
+ listenerContext.resetOffsetToLogStart();
Review Comment:
Yep. Adding a log message at info since it should only happen once per
listener registration and until a snapshot is generated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]