mjsax commented on code in PR #13534: URL: https://github.com/apache/kafka/pull/13534#discussion_r1164442798
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java: ########## @@ -1195,13 +1195,8 @@ public boolean commitNeeded() { commitNeeded = true; entry.setValue(offset - 1); } - } catch (final TimeoutException error) { - // the `consumer.position()` call should never block, because we know that we did process data - // for the requested partition and thus the consumer should have a valid local position - // that it can return immediately - - // hence, a `TimeoutException` indicates a bug and thus we rethrow it as fatal `IllegalStateException` - throw new IllegalStateException(error); + } catch (final TimeoutException swallow) { + log.debug("Could not get consumer position for partition {} due to: {}", partition, swallow); Review Comment: Good question. I think both cases are different: 1. In `commitNeeded()`, we call `consumer.position()` if the flag `commitNeeded == false`, ie, we know that we did not process any record, and thus, we cannot be sure that the consumer has a locally cached valid position. (Note, that we introduce this code in AK 3.1.0 release, and it seems it was more or less copied from `findOffset` which is older.) 2. `findOffset` on the other hand, is only called for partitions which have the `commitNeeded` flag set as `true` (cf `committableOffsetsAndMetadata()` where we call `findOffset`). This code was already added in 2.6 release. The ticket is also reported only for `commitNeeded` case above, indicating that this older code is correct, while the bug was only introduced in 3.1 when the code might have be copied incorrectly. Does this make sense? Or do I miss anything? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org