[ 
https://issues.apache.org/jira/browse/FLINK-18451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17168873#comment-17168873
 ] 

ming li commented on FLINK-18451:
---------------------------------

Yes, it supports consumption from the specified offset. When the task restarts, 
we will reset all partitions to the offset saved in the previous checkpoint 
through an interface.

But unlike Kafka, Kafka can manually assign partitions through the 
consumer.assign method, and even if multiple consumers manually assign the same 
partition, they can continue to consume without affecting each other. Our 
middleware can only allocate partitions to consumers on the server side, just 
like the Kafka consumer group. When a consumer goes online or offline, the 
server side will actively redistribute partitions.

This way can meet our at-least-once needs under normal circumstances. However, 
in the case of dual-run, the old consumer will still participate in partition 
allocation and consume data, which makes some of our data unable to be consumed 
in the new task.

> Flink HA on yarn may appear TaskManager double running when HA is restored
> --------------------------------------------------------------------------
>
>                 Key: FLINK-18451
>                 URL: https://issues.apache.org/jira/browse/FLINK-18451
>             Project: Flink
>          Issue Type: Bug
>          Components: Deployment / YARN
>    Affects Versions: 1.9.0
>            Reporter: ming li
>            Priority: Major
>              Labels: high-availability
>
> We found that when NodeManager is lost, the new JobManager will be restored 
> by Yarn's ResourceManager, and the Leader node will be registered on 
> Zookeeper. The original TaskManager will find the new JobManager through 
> Zookeeper and close the old JobManager connection. At this time, all tasks of 
> the TaskManager will fail. The new JobManager will directly perform job 
> recovery and recover from the latest checkpoint.
> However, during the recovery process, when a TaskManager is abnormally 
> connected to Zookeeper, it is not registered with the new JobManager in time. 
> Before the following timeout:
> 1. Connect with Zookeeper
> 2. Heartbeat with JobManager/ResourceManager
> Task will continue to run (assuming that Task can run independently in 
> TaskManager). Assuming that HA recovers fast enough, some Task double runs 
> will occur at this time.
> Do we need to make a persistent record of the cluster resources we allocated 
> during the runtime, and use it to judge all Task stops when HA is restored?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to