[
https://issues.apache.org/jira/browse/FLINK-6006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938267#comment-15938267
]
Tzu-Li (Gordon) Tai edited comment on FLINK-6006 at 3/23/17 1:23 PM:
---------------------------------------------------------------------
Hi [~gyfora],
After the restore, could you find any logs like the following:
{{Unable to reach broker after ... retries. Returning all current partitions}}?
This log should be at WARN level.
I'm guessing that the 0.8 consumer will continue to attempt consuming an
unreachable broker even after all retries, in that case this message should be
popping up non-stop for the brokers that are down. So, the complete state is
restored (including the partitions which couldn't be reached due to downtime),
but the 0.8 consumer simply is not failing on the unreachable brokers when it
tries to consume them. This shouldn't cause the consumer state to be broken
like before, though; you'll simply see that the offsets of the unreachable
partitions won't be advancing in the consumer's state.
Also, if DEBUG level happens to be on, could you also find:
- {{Setting restore state in the FlinkKafkaConsumer.}}
- {{Using the following offsets: ...}}
The partitions you see in the second message should be the complete partition
list for each subtask
was (Author: tzulitai):
Hi [~gyfora],
After the restore, could you find any logs like the following:
{{Unable to reach broker after ... retries. Returning all current partitions}}?
This log should be at WARN level.
I'm guessing that the 0.8 consumer will continue to attempt consuming an
unreachable broker even after all retries, in that case this message should be
popping up non-stop for the brokers that are down. So, even though the complete
state is restored (including the partitions which couldn't be reached due to
downtime), the 0.8 consumer might not be failing on the unreachable brokers.
This shouldn't cause the consumer state to be broken like before, though;
you'll simply see that the offsets of the unreachable partitions won't be
advancing in the consumer's state.
Also, if DEBUG level happens to be on, could you also find:
- {{Setting restore state in the FlinkKafkaConsumer.}}
- {{Using the following offsets: ...}}
The partitions you see in the second message should be the complete partition
list for each subtask
> Kafka Consumer can lose state if queried partition list is incomplete on
> restore
> --------------------------------------------------------------------------------
>
> Key: FLINK-6006
> URL: https://issues.apache.org/jira/browse/FLINK-6006
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector, Streaming Connectors
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Fix For: 1.1.5, 1.2.1
>
>
> In 1.1.x and 1.2.x, the FlinkKafkaConsumer performs partition list querying
> on restore. Then, only restored state of partitions that exists in the
> queried list is used to initialize the fetcher's state holders.
> If in any case the returned partition list is incomplete (i.e. missing
> partitions that existed before, perhaps due to temporary ZK / broker
> downtime), then the state of the missing partitions is dropped and cannot be
> recovered anymore.
> In 1.3-SNAPSHOT, this is fixed by changes in FLINK-4280, so only 1.1 and 1.2
> is affected.
> We can backport some of the behavioural changes there to 1.1 and 1.2.
> Generally, we should not depend on the current partition list in Kafka when
> restoring, but just restore all previous state into the fetcher's state
> holders.
> This would therefore also require some checking on how the consumer threads /
> Kafka clients behave when its assigned partitions cannot be reached.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)