ableegoldman commented on a change in pull request #8677: URL: https://github.com/apache/kafka/pull/8677#discussion_r428166926
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ########## @@ -171,7 +173,7 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams "This can happen if the Kafka cluster is temporary not available. " + "You can increase admin client config `retries` to be resilient against this error.", retries); log.error(timeoutAndRetryError); - throw new StreamsException(timeoutAndRetryError); + throw new TaskMigratedException("Time out for creating internal topics", new TimeoutException(timeoutAndRetryError)); Review comment: > assign the union of all previously assigned tasks and any new tasks we were able to set up I was worrying about the case where some internal topics got deleted, and we would cause trouble for the previous owner of the corresponding task. But I suppose if the topic was deleted randomly in the middle of processing, the thread would die anyway, so the odds of the original owner _not_ dying on internal topic deletion is pretty low. With this strategy, we would at least contain the blast radius to just that current owner since once it dies, that task has no previous owner and would not be assigned. So I'm pretty strongly in favor of this idea. Arguably we could just incorporate this into the existing `FallbackPriorTaskAssignor` since it will just reduce to the current one in the case all topics have been validated. I'm not sure if that would be more work or less, though. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org