guozhangwang commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r459799708
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -299,7 +318,17 @@ private void restoreState(final StateRestoreCallback
stateRestoreCallback,
restoreRecords.add(recordConverter.convert(record));
}
}
- offset = globalConsumer.position(topicPartition);
+ try {
+ offset = globalConsumer.position(topicPartition);
+ } catch (final TimeoutException error) {
+ // the `globalConsumer.position()` call should never
block, because we know that we did
+ // a successful `position()` call above for the
requested partition and thus the consumer
+ // should have a valid local position that it can
return immediately
+
+ // hence, a `TimeoutException` indicates a bug and
thus we rethrow it as fatal `IllegalStateException`
+ throw new IllegalStateException(error);
+ }
+
stateRestoreAdapter.restoreBatch(restoreRecords);
stateRestoreListener.onBatchRestored(topicPartition,
storeName, offset, restoreRecords.size());
restoreCount += restoreRecords.size();
Review comment:
My understanding is that for non-global state stores, we would also
start ticking if we cannot make progress either due to exceptions or `poll()`
returned no data, is that right? If yes, I'm +1 on covering the same for global
state store here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]