dongjinleekr commented on pull request #9414: URL: https://github.com/apache/kafka/pull/9414#issuecomment-724725724
Hi @vvcephei, Here is the fix. The reason for the broken integration tests was: `KafkaStreams#cleanUp` can be called regardless of the `StreamThread`s are terminated. That is, if `KafkaStreams#cleanUp` is called when the `KafkaStreams` is still running, the `{state-store-directory}/{application-id}` directory is deleted and `StreamThread` crashes with the exception from `StateDirectory#directoryForTask` - since it fails to create the task state store directory, i.e., `{state-store-directory}/{application-id}/{task-id}`. Moreover, `StateDirectory#directoryForTask` method has two additional vulnerabilities: 1. When it creates the task state store directory, it does not create its parent directories automatically - if there is not `{state-store-directory}/{application-id}`, `taskDir.mkdir()` returns false and it throws an exception. For this behavior breaks the integration tests, I modified `taskDir.mkdir()` to `taskDir.mkdirs()` to create `{state-store-directory}/{application-id}` automatically. 2. This method only checks whether there is a `File` at `{state-store-directory}/{application-id}/{task-id}`, regardless of it is actually a directory or not. I added an additional check condition for this case and `StateDirectoryTest#shouldThrowProcessorStateException` is updated accordingly. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org