mjsax commented on code in PR #13534:
URL: https://github.com/apache/kafka/pull/13534#discussion_r1164442798


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -1195,13 +1195,8 @@ public boolean commitNeeded() {
                         commitNeeded = true;
                         entry.setValue(offset - 1);
                     }
-                } catch (final TimeoutException error) {
-                    // the `consumer.position()` call should never block, 
because we know that we did process data
-                    // for the requested partition and thus the consumer 
should have a valid local position
-                    // that it can return immediately
-
-                    // hence, a `TimeoutException` indicates a bug and thus we 
rethrow it as fatal `IllegalStateException`
-                    throw new IllegalStateException(error);
+                } catch (final TimeoutException swallow) {
+                    log.debug("Could not get consumer position for partition 
{} due to: {}", partition, swallow);

Review Comment:
   Good question.
   
   I think both cases are different:
   
   1. In `commitNeeded()`, we call `consumer.position()` if the flag 
`commitNeeded == false`, ie, we know that we did not process any record, and 
thus, we cannot be sure that the consumer has a locally cached valid position. 
(Note, that we introduce this code in AK 3.1.0 release, and it seems it was 
more or less copied from `findOffset` which is older.)
   
   2. `findOffset` on the other hand, is only called for partitions which have 
the `commitNeeded` flag set as `true` (cf `committableOffsetsAndMetadata()` 
where we call `findOffset`). This code was already added in 2.6 release. The 
ticket is also reported only for `commitNeeded` case above, indicating that 
this older code is correct, while the bug was only introduced in 3.1 when the 
code might have be copied incorrectly.
   
   Does this make sense? Or do I miss anything?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to