GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/3505

    [backport-1.2] [FLINK-6006]  [kafka] Always use complete restored state in 
FlinkKafkaConsumer

    (This PR is the fix of FLINK-6006 for Flink 1.2)
    
    Previously, the Kafka Consumer performs partition list querying on
    restore, and then uses it to filter out restored state of partitions
    that doesn't exist in the list.
    
    If in any case the returned partitions list is incomplete (i.e. missing
    partitions that existed before perhaps due to temporary ZK / broker
    downtimes), then the state of the missing partitions is dropped and
    cannot be recovered anymore.
    
    This PR fixes this by always restoring the complete state, without
    any sort of filtering. We simply let the consumer fail if assigned
    partitions to the consuming threads / Kafka clients are unreachable when
    the consumer starts running.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-6006

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3505.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3505
    
----
commit d38e3fd4b123cacf4fd6ee3c1baab77e9b8593a0
Author: Tzu-Li (Gordon) Tai <[email protected]>
Date:   2017-03-10T06:10:56Z

    [FLINK-6006] [kafka] Always use complete restored state in 
FlinkKafkaConsumer
    
    Previously, the Kafka Consumer performs partition list querying on
    restore, and then uses it to filter out restored state of partitions
    that doesn't exist in the list.
    
    If in any case the returned partitions list is incomplete (i.e. missing
    partitions that existed before perhaps due to temporary ZK / broker
    downtimes), then the state of the missing partitions is dropped and
    cannot be recovered anymore.
    
    This commit fixes this by always restoring the complete state, without
    any sort of filtering. We simply let the consumer fail if assigned
    partitions to the consuming threads / Kafka clients are unreachable when
    the consumer starts running.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to