hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514536828



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -228,35 +234,81 @@ private void updateLeaderEndOffsetAndTimestamp(
         final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
         if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-            updateHighWatermark(state, currentTimeMs);
+            onUpdateLeaderHighWatermark(state, currentTimeMs);
         }
 
-        LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-        fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+        fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
     }
 
-    private void updateHighWatermark(
-        EpochState state,
+    private void onUpdateLeaderHighWatermark(
+        LeaderState state,
         long currentTimeMs
     ) {
         state.highWatermark().ifPresent(highWatermark -> {
-            logger.debug("High watermark updated to {}", highWatermark);
+            logger.debug("Leader high watermark updated to {}", highWatermark);
             log.updateHighWatermark(highWatermark);
-
-            LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-            appendPurgatory.maybeComplete(offset, currentTimeMs);
-            fetchPurgatory.maybeComplete(offset, currentTimeMs);
+            appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+            maybeFireHandleCommit(highWatermark.offset);
         });
     }
 
-    @Override
-    public LeaderAndEpoch currentLeaderAndEpoch() {
-        return quorum.leaderAndEpoch();
+    private void maybeFireHandleCommit(long highWatermark) {
+        maybeFireHandleCommit(listenerContexts, highWatermark);
+    }
+
+    private void maybeFireHandleCommit(List<ListenerContext> listenerContexts, 
long highWatermark) {
+        // TODO: When there are multiple listeners, we can cache reads to save 
some work
+        for (ListenerContext listenerContext : listenerContexts) {
+            OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+            if (!nextExpectedOffsetOpt.isPresent()) {
+                continue;
+            }
+
+            long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+            if (nextExpectedOffset < highWatermark) {
+                LogFetchInfo readInfo = log.read(nextExpectedOffset, 
Isolation.COMMITTED);
+                listenerContext.fireHandleCommit(nextExpectedOffset, 
readInfo.records);
+            }
+        }
+    }
+
+    private void maybeFireHandleCommit(long baseOffset, int epoch, List<T> 
records) {
+        for (ListenerContext listenerContext : listenerContexts) {
+            OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+            if (!nextExpectedOffsetOpt.isPresent()) {
+                continue;
+            }
+
+            long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+            if (nextExpectedOffset == baseOffset) {
+                listenerContext.fireHandleCommit(baseOffset, epoch, records);
+            }
+        }
+    }
+
+    private void maybeFireHandleClaim(LeaderState state) {
+        for (ListenerContext listenerContext : listenerContexts) {
+            int leaderEpoch = state.epoch();

Review comment:
       I will move this outside the loop.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to