cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r971662847


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -444,6 +451,8 @@ public void restore(final Map<TaskId, Task> tasks) {
                 final Set<TaskId> corruptedTasks = new HashSet<>();
                 e.partitions().forEach(partition -> 
corruptedTasks.add(changelogs.get(partition).stateManager.taskId()));
                 throw new TaskCorruptedException(corruptedTasks, e);
+            } catch (final InterruptException interruptException) {
+                throw interruptException;

Review Comment:
   Yes, it will. However, `InterruptException` is a `KafkaException`, thus it 
would be caught by the `catch`-clause below and rethrown as a 
`StreamsException`. I thought it would be easier to directly catch an 
`InterruptException` in `DefaultStateUpdater` instead of catching a 
`StreamsException`, unwrap it, verify if it is an `InterruptException`, and if 
not rethrow it. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -444,6 +451,8 @@ public void restore(final Map<TaskId, Task> tasks) {
                 final Set<TaskId> corruptedTasks = new HashSet<>();
                 e.partitions().forEach(partition -> 
corruptedTasks.add(changelogs.get(partition).stateManager.taskId()));
                 throw new TaskCorruptedException(corruptedTasks, e);
+            } catch (final InterruptException interruptException) {
+                throw interruptException;

Review Comment:
   Yes, it will. However, `InterruptException` is a `KafkaException`, thus it 
would be caught by the `catch`-clause below and be rethrown as a 
`StreamsException`. I thought it would be easier to directly catch an 
`InterruptException` in `DefaultStateUpdater` instead of catching a 
`StreamsException`, unwrap it, verify if it is an `InterruptException`, and if 
not rethrow it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to