Nikita-Shupletsov commented on code in PR #20665:
URL: https://github.com/apache/kafka/pull/20665#discussion_r2422215571


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -470,17 +470,24 @@ private OffsetAndMetadata findOffsetAndMetadata(final 
TopicPartition partition)
         Optional<Integer> leaderEpoch = 
partitionGroup.headRecordLeaderEpoch(partition);
         final long partitionTime = 
partitionGroup.partitionTimestamp(partition);
         if (offset == null) {
-            try {
-                if (nextOffsetsAndMetadataToBeConsumed.containsKey(partition)) 
{
-                    final OffsetAndMetadata offsetAndMetadata = 
nextOffsetsAndMetadataToBeConsumed.get(partition);
-                    offset = offsetAndMetadata.offset();
-                    leaderEpoch = offsetAndMetadata.leaderEpoch();
-                } else {
-                    // This indicates a bug and thus we rethrow it as fatal 
`IllegalStateException`
-                    throw new IllegalStateException("Stream task " + id + " 
does not know the partition: " + partition);
+            final OffsetAndMetadata offsetAndMetadata = 
nextOffsetsAndMetadataToBeConsumed.get(partition);
+            if (offsetAndMetadata == null) {
+                try {
+                    offset = mainConsumer.position(partition);

Review Comment:
   my understanding so far:
   we have some metadata that's one per task. but we have multiple input 
partitions. so instead of picking one, we just add it everywhere. and during 
restore we just read them all in whatever order and expect them to be more or 
less the same.
   so if we are adding a new partition on the fly, we want that partition to 
also have that metadata. if we follow that logic.
   so if we follow that logic, we need to commit even for that empty partition. 
   
   so I added a fallback to the previous logic when we ask the consumer about 
the offset
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to