jsancio commented on code in PR #16626:
URL: https://github.com/apache/kafka/pull/16626#discussion_r1684684173


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -348,22 +360,29 @@ private void onUpdateLeaderHighWatermark(
     private void updateListenersProgress(long highWatermark) {
         for (ListenerContext listenerContext : listenerContexts.values()) {
             listenerContext.nextExpectedOffset().ifPresent(nextExpectedOffset 
-> {
-                // Send snapshot to the listener, if the listener is at the 
beginning of the log and there is a snapshot,
-                // or the listener is trying to read an offset for which there 
isn't a segment in the log.
+                // Send snapshot to the listener, if there is a snapshot for 
the partition,
+                // and it is a new listener or
+                // the listener is trying to read an offset for which there 
isn't a segment in the log.
                 if (nextExpectedOffset < highWatermark &&
-                    ((nextExpectedOffset == 0 && latestSnapshot().isPresent()) 
||
-                     nextExpectedOffset < log.startOffset())
+                    ((nextExpectedOffset == 
ListenerContext.STARTING_NEXT_OFFSET || nextExpectedOffset < log.startOffset())
+                     && latestSnapshot().isPresent())
                 ) {
-                    SnapshotReader<T> snapshot = 
latestSnapshot().orElseThrow(() -> new IllegalStateException(
+                    listenerContext.fireHandleSnapshot(latestSnapshot().get());
+                } else if (nextExpectedOffset == 
ListenerContext.STARTING_NEXT_OFFSET) {
+                    // Reset the next offset to 0 since it is a new listener 
context and there is no
+                    // bootstraping checkpoint
+                    listenerContext.resetOffsetToLogStart();

Review Comment:
   Yep. Adding a log message at info since it should only happen once per 
listener registration and until a snapshot is generated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to