hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514537694



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -228,35 +234,81 @@ private void updateLeaderEndOffsetAndTimestamp(
         final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
         if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-            updateHighWatermark(state, currentTimeMs);
+            onUpdateLeaderHighWatermark(state, currentTimeMs);
         }
 
-        LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-        fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+        fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
     }
 
-    private void updateHighWatermark(
-        EpochState state,
+    private void onUpdateLeaderHighWatermark(
+        LeaderState state,
         long currentTimeMs
     ) {
         state.highWatermark().ifPresent(highWatermark -> {
-            logger.debug("High watermark updated to {}", highWatermark);
+            logger.debug("Leader high watermark updated to {}", highWatermark);
             log.updateHighWatermark(highWatermark);
-
-            LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-            appendPurgatory.maybeComplete(offset, currentTimeMs);
-            fetchPurgatory.maybeComplete(offset, currentTimeMs);
+            appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+            maybeFireHandleCommit(highWatermark.offset);
         });
     }
 
-    @Override
-    public LeaderAndEpoch currentLeaderAndEpoch() {
-        return quorum.leaderAndEpoch();
+    private void maybeFireHandleCommit(long highWatermark) {
+        maybeFireHandleCommit(listenerContexts, highWatermark);
+    }
+
+    private void maybeFireHandleCommit(List<ListenerContext> listenerContexts, 
long highWatermark) {
+        // TODO: When there are multiple listeners, we can cache reads to save 
some work
+        for (ListenerContext listenerContext : listenerContexts) {
+            OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+            if (!nextExpectedOffsetOpt.isPresent()) {
+                continue;
+            }
+
+            long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+            if (nextExpectedOffset < highWatermark) {
+                LogFetchInfo readInfo = log.read(nextExpectedOffset, 
Isolation.COMMITTED);
+                listenerContext.fireHandleCommit(nextExpectedOffset, 
readInfo.records);
+            }
+        }
+    }
+
+    private void maybeFireHandleCommit(long baseOffset, int epoch, List<T> 
records) {

Review comment:
       The only difference is the input. I will add some comments to try and 
clarify the usage.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to