wcarlson5 commented on a change in pull request #10634:
URL: https://github.com/apache/kafka/pull/10634#discussion_r629609542



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -900,6 +902,14 @@ private long pollPhase() {
 
         final int numRecords = records.count();
 
+        for (final TopicPartition topicPartition: records.partitions()) {
+            records
+                .records(topicPartition)
+                .stream()
+                .max(Comparator.comparing(ConsumerRecord::offset))
+                .ifPresent(t -> 
taskManager.updateTaskEndMetadata(topicPartition, t.offset()));

Review comment:
       It was intended to be the highest offset that the client knows is in the 
topic. In order to make that the case I am looking the most recent pull now.
   
   I will add the java docs to the TaskMetadata getters




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to