GitHub user BrainLogic opened a pull request: https://github.com/apache/flink/pull/3035
[ FLINK-4905] Kafka test instability IllegalStateException: Client is not started Root cause of the issue: `notifyCheckpointComplete` can occur during the cancellation or `runFetchLoop` fail and call `commitOffset` on closed `curatorClient`, so use `CheckpointLock` to close `curatorClient`. There is a diagram in the jira that describes behaviour of using `Kafka08Fetcher`. Notes: 1. I don't like approach where `checkPointLock` is leaked into `SourceContext`, this may lead to deadlock. 2. Work with `ZookeeperOffsetHandler` can continue even after the call Kafka08Fetcher.cancel until the `Handler` will not be null. 3. `ZookeeperOffsetHandler` could have `ReadWriteLock` and use `writeLock` only for close operation, but I have doubt, Flink code base does not contain any `ReentrantLocks`. There is possibility to implement such logic without any locks by using lock-free approach. 4. Also in **jdk8**, we have powerful tool `StampedLock`. In which version of Flink we will be able to use **jdk8** features? You can merge this pull request into a Git repository by running: $ git pull https://github.com/BrainLogic/flink FLINK-4905_PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3035.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3035 ---- commit fd5524eb15ec772f95c4168818264c63e45f5784 Author: cube <aefimov.c...@gmail.com> Date: 2016-12-20T23:41:12Z [FLINK-4905] Kafka test instability IllegalStateException: Client is not started ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---