nicktelford commented on code in PR #21738:
URL: https://github.com/apache/kafka/pull/21738#discussion_r2940412420
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -254,70 +269,49 @@ public StateStore globalStore(final String name) {
}
// package-private for test only
- void initializeStoreOffsetsFromCheckpoint(final boolean storeDirIsEmpty) {
- try {
- final Map<TopicPartition, Long> loadedCheckpoints =
checkpointFile.read();
-
- log.trace("Loaded offsets from the checkpoint file: {}",
loadedCheckpoints);
-
- for (final StateStoreMetadata store : stores.values()) {
- if (store.corrupted) {
- log.error("Tried to initialize store offsets for corrupted
store {}", store);
- throw new IllegalStateException("Should not initialize
offsets for a corrupted task");
- }
+ void initializeStoreOffsets(final boolean storeDirIsEmpty) {
+ for (final StateStoreMetadata store : stores.values()) {
+ if (store.corrupted) {
+ log.error("Tried to initialize store offsets for corrupted
store {}", store);
+ throw new ProcessorStateException(
Review Comment:
This is actually for backwards compatibility: the old `throw` here was
inside a `try` block, which actually caught the `IllegalStateException` and
wrapped it in a `ProcessorStateException`.
See here:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L318
Since the `try` block was removed (as we're no longer working with files in
this method), we now just directly wrap the exception when we throw it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]