3. We could avoid force deletions from within Flink. If the user does it,
then we don't give guarantees.

I am fine with your current proposal. +1 for moving forward with it.

Cheers,
Till

On Thu, Oct 1, 2020 at 2:32 AM Yang Wang <danrtsey...@gmail.com> wrote:

> 2. Yes. This is exactly what I mean. Storing the HA information relevant
> to a specific component in a single ConfigMap and ensuring that “Get(check
> the leader)-and-Update(write back to the ConfigMap)” is a transactional
> operation. Since we only store the job graph stateHandler(not the real
> data) in the ConfigMap, I think 1MB is big enough for the dispater-leader
> ConfigMap(the biggest one with multiple jobs). I roughly calculate that
> could we have more than 1000 Flink jobs in a Flink session cluster.
>
> 3. Actually, K8s has a stronger guarantee than YARN. And the StatefulSet
> could provide at most one semantics if no manually force-deletion
> happened[1]. Based on the previous discussion, we have successfully avoided
> the "lock-and-release" in the implementation. So I still insist on using
> the current Deployment.
>
>
> [1].
> https://kubernetes.io/docs/tasks/run-application/force-delete-stateful-set-pod/#force-deletion
>
>
> Best,
> Yang
>
> Till Rohrmann <trohrm...@apache.org> 于2020年9月30日周三 下午11:57写道:
>
>> Thanks for the clarifications Yang Wang.
>>
>> 2. Keeping the HA information relevant for a component (Dispatcher,
>> JobManager, ResourceManager) in a single ConfigMap sounds good. We should
>> check that we don't exceed the 1 MB size limit with this approach though.
>> The Dispatcher's ConfigMap would then contain the current leader, the
>> running jobs and the pointers to the persisted JobGraphs. The JobManager's
>> ConfigMap would then contain the current leader, the pointers to the
>> checkpoints and the checkpoint ID counter, for example.
>>
>> 3. Ah ok, I somehow thought that K8s would give us stronger
>> guarantees than Yarn in this regard. That's a pity.
>>
>> Cheers,
>> Till
>>
>> On Wed, Sep 30, 2020 at 10:03 AM tison <wander4...@gmail.com> wrote:
>>
>>> Thanks for your explanation. It would be fine if only checking
>>> leadership & actually write information is atomic.
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Yang Wang <danrtsey...@gmail.com> 于2020年9月30日周三 下午3:57写道:
>>>
>>>> Thanks till and tison for your comments.
>>>>
>>>> @Till Rohrmann <trohrm...@apache.org>
>>>> 1. I am afraid we could not do this if we are going to use fabric8
>>>> Kubernetes client SDK for the leader election. The official Kubernetes Java
>>>> client[1] also could not support it. Unless we implement a new
>>>> LeaderElector in Flink based on the very basic Kubernetes API. But it seems
>>>> that we could gain too much from this.
>>>>
>>>> 2. Yes, the implementation will be a little complicated if we want to
>>>> completely eliminate the residual job graphs or checkpoints. Inspired by
>>>> your suggestion, another different solution has come into my mind. We could
>>>> use a same ConfigMap storing the JobManager leader, job graph,
>>>> checkpoint-counter, checkpoint. Each job will have a specific ConfigMap for
>>>> the HA meta storage. Then it will be easier to guarantee that only the
>>>> leader could write the ConfigMap in a transactional operation. Since
>>>> “Get(check the leader)-and-Update(write back to the ConfigMap)” is a
>>>> transactional operation.
>>>>
>>>> 3. Yes, StatefulSet(1) + ConfigMap + HDFS/S3 is also a solution.
>>>> However, we still have the chances that two JobManager are running and
>>>> trying to get/delete a key in the same ConfigMap concurrently. Imagine that
>>>> the kubelet(like NodeManager in YARN) is down, and then the JobManager
>>>> could not be deleted. A new JobManager pod will be launched. We are just in
>>>> the similar situation as Deployment(1) + ConfigMap + HDFS/S3. The only
>>>> benefit is we do not need to implement a leader election/retrieval service.
>>>>
>>>> @tison
>>>> Actually, I do not think we will have such issue in the Kubernetes HA
>>>> service. In the Kubernetes LeaderElector[2], we have the leader information
>>>> stored on the annotation of leader ConfigMap. So it would not happen the
>>>> old leader could wrongly override the leader information. Once a JobManager
>>>> want to write his leader information to the ConfigMap, it will check
>>>> whether it is the leader now. If not, anything will happen. Moreover, the
>>>> Kubernetes Resource Version[3] ensures that no one else has snuck in and
>>>> written a different update while the client was in the process of
>>>> performing its update.
>>>>
>>>>
>>>> [1].
>>>> https://github.com/kubernetes-client/java/blob/master/examples/src/main/java/io/kubernetes/client/examples/LeaderElectionExample.java
>>>> [2].
>>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java
>>>> <https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70>
>>>> [3].
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink#FLIP144:NativeKubernetesHAforFlink-Resourceversion
>>>>
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>> tison <wander4...@gmail.com> 于2020年9月30日周三 下午3:21写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> Generally +1 for a native k8s HA service.
>>>>>
>>>>> For leader election & publish leader information, there was a
>>>>> discussion[1]
>>>>> pointed out that since these two actions is NOT atomic, there will be
>>>>> always
>>>>> edge case where a previous leader overwrite leader information, even
>>>>> with
>>>>> versioned write. Versioned write helps on read again if version
>>>>> mismatches
>>>>> so if we want version write works, information in the kv pair should
>>>>> help the
>>>>> contender reflects whether it is the current leader.
>>>>>
>>>>> The idea of writes leader information on contender node or something
>>>>> equivalent makes sense but the details depends on how it is
>>>>> implemented.
>>>>> General problems are that
>>>>>
>>>>> 1. TM might be a bit late before it updated correct leader information
>>>>> but
>>>>> only if the leader election process is short and leadership is stable
>>>>> at most
>>>>> time, it won't be a serious issue.
>>>>> 2. The process TM extract leader information might be a bit more
>>>>> complex
>>>>> than directly watching a fixed key.
>>>>>
>>>>> Atomic issue can be addressed if one leverages low APIs such as lease
>>>>> & txn
>>>>> but it causes more developing efforts. ConfigMap and encapsulated
>>>>> interface,
>>>>> thought, provides only a self-consistent mechanism which doesn't
>>>>> promise
>>>>> more consistency for extension.
>>>>>
>>>>> Best,
>>>>> tison.
>>>>>
>>>>> [1]
>>>>> https://lists.apache.org/x/thread.html/594b66ecb1d60b560a5c4c08ed1b2a67bc29143cb4e8d368da8c39b2@%3Cuser.zookeeper.apache.org%3E
>>>>>
>>>>>
>>>>>
>>>>> Till Rohrmann <trohrm...@apache.org> 于2020年9月29日周二 下午9:25写道:
>>>>>
>>>>>> For 1. I was wondering whether we can't write the leader connection
>>>>>> information directly when trying to obtain the leadership (trying to
>>>>>> update
>>>>>> the leader key with one's own value)? This might be a little detail,
>>>>>> though.
>>>>>>
>>>>>> 2. Alright, so we are having a similar mechanism as we have in
>>>>>> ZooKeeper
>>>>>> with the ephemeral lock nodes. I guess that this complicates the
>>>>>> implementation a bit, unfortunately.
>>>>>>
>>>>>> 3. Wouldn't the StatefulSet solution also work without a PV? One could
>>>>>> configure a different persistent storage like HDFS or S3 for storing
>>>>>> the
>>>>>> checkpoints and job blobs like in the ZooKeeper case. The current
>>>>>> benefit I
>>>>>> see is that we avoid having to implement this multi locking mechanism
>>>>>> in
>>>>>> the ConfigMaps using the annotations because we can be sure that
>>>>>> there is
>>>>>> only a single leader at a time if I understood the guarantees of K8s
>>>>>> correctly.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Tue, Sep 29, 2020 at 8:10 AM Yang Wang <danrtsey...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> > Hi Till, thanks for your valuable feedback.
>>>>>> >
>>>>>> > 1. Yes, leader election and storing leader information will use a
>>>>>> same
>>>>>> > ConfigMap. When a contender successfully performs a versioned
>>>>>> annotation
>>>>>> > update operation to the ConfigMap, it means that it has been
>>>>>> elected as the
>>>>>> > leader. And it will write the leader information in the callback of
>>>>>> leader
>>>>>> > elector[1]. The Kubernetes resource version will help us to avoid
>>>>>> the
>>>>>> > leader ConfigMap is wrongly updated.
>>>>>> >
>>>>>> > 2. The lock and release is really a valid concern. Actually in
>>>>>> current
>>>>>> > design, we could not guarantee that the node who tries to write his
>>>>>> > ownership is the real leader. Who writes later, who is the owner. To
>>>>>> > address this issue, we need to store all the owners of the key.
>>>>>> Only when
>>>>>> > the owner is empty, the specific key(means a checkpoint or job
>>>>>> graph) could
>>>>>> > be deleted. However, we may have a residual checkpoint or job graph
>>>>>> when
>>>>>> > the old JobManager crashed exceptionally and do not release the
>>>>>> lock. To
>>>>>> > solve this problem completely, we need a timestamp renew mechanism
>>>>>> > for CompletedCheckpointStore and JobGraphStore, which could help us
>>>>>> to the
>>>>>> > check the JobManager timeout and then clean up the residual keys.
>>>>>> >
>>>>>> > 3. Frankly speaking, I am not against with this solution. However,
>>>>>> in my
>>>>>> > opinion, it is more like a temporary proposal. We could use
>>>>>> StatefulSet to
>>>>>> > avoid leader election and leader retrieval. But I am not sure
>>>>>> whether
>>>>>> > TaskManager could properly handle the situation that same hostname
>>>>>> with
>>>>>> > different IPs, because the JobManager failed and relaunched. Also
>>>>>> we may
>>>>>> > still have two JobManagers running in some corner cases(e.g.
>>>>>> kubelet is
>>>>>> > down but the pod is running). Another concern is we have a strong
>>>>>> > dependency on the PersistentVolume(aka PV) in FileSystemHAService.
>>>>>> But it
>>>>>> > is not always true especially in self-build Kubernetes cluster.
>>>>>> Moreover,
>>>>>> > PV provider should guarantee that each PV could only be mounted
>>>>>> once. Since
>>>>>> > the native HA proposal could cover all the functionality of
>>>>>> StatefulSet
>>>>>> > proposal, that's why I prefer the former.
>>>>>> >
>>>>>> >
>>>>>> > [1].
>>>>>> >
>>>>>> https://github.com/fabric8io/kubernetes-client/blob/6d83d41d50941bf8f2d4e0c859951eb10f617df6/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/extended/leaderelection/LeaderElector.java#L70
>>>>>> >
>>>>>> > Best,
>>>>>> > Yang
>>>>>> >
>>>>>> > Till Rohrmann <trohrm...@apache.org> 于2020年9月28日周一 下午9:29写道:
>>>>>> >
>>>>>> >> Thanks for creating this FLIP Yang Wang. I believe that many of
>>>>>> our users
>>>>>> >> will like a ZooKeeper-less HA setup.
>>>>>> >>
>>>>>> >> +1 for not separating the leader information and the leader
>>>>>> election if
>>>>>> >> possible. Maybe it is even possible that the contender writes his
>>>>>> leader
>>>>>> >> information directly when trying to obtain the leadership by
>>>>>> performing a
>>>>>> >> versioned write operation.
>>>>>> >>
>>>>>> >> Concerning the lock and release operation I have a question: Can
>>>>>> there be
>>>>>> >> multiple owners for a given key-value pair in a ConfigMap? If not,
>>>>>> how can
>>>>>> >> we ensure that the node which writes his ownership is actually the
>>>>>> leader
>>>>>> >> w/o transactional support from K8s? In ZooKeeper we had the same
>>>>>> problem
>>>>>> >> (we should probably change it at some point to simply use a
>>>>>> >> transaction which checks whether the writer is still the leader)
>>>>>> and
>>>>>> >> therefore introduced the ephemeral lock nodes. What they allow is
>>>>>> that
>>>>>> >> there can be multiple owners of a given ZNode at a time. The last
>>>>>> owner
>>>>>> >> will then be responsible for the cleanup of the node.
>>>>>> >>
>>>>>> >> I see the benefit of your proposal over the stateful set proposal
>>>>>> because
>>>>>> >> it can support multiple standby JMs. Given the problem of locking
>>>>>> key-value
>>>>>> >> pairs it might be simpler to start with this approach where we
>>>>>> only have
>>>>>> >> single JM. This might already add a lot of benefits for our users.
>>>>>> Was
>>>>>> >> there a specific reason why you discarded this proposal (other than
>>>>>> >> generality)?
>>>>>> >>
>>>>>> >> @Uce it would be great to hear your feedback on the proposal since
>>>>>> you
>>>>>> >> already implemented a K8s based HA service.
>>>>>> >>
>>>>>> >> Cheers,
>>>>>> >> Till
>>>>>> >>
>>>>>> >> On Thu, Sep 17, 2020 at 5:06 AM Yang Wang <danrtsey...@gmail.com>
>>>>>> wrote:
>>>>>> >>
>>>>>> >>> Hi Xintong and Stephan,
>>>>>> >>>
>>>>>> >>> Thanks a lot for your attention on this FLIP. I will address the
>>>>>> >>> comments inline.
>>>>>> >>>
>>>>>> >>> # Architecture -> One or two ConfigMaps
>>>>>> >>>
>>>>>> >>> Both of you are right. One ConfigMap will make the design and
>>>>>> >>> implementation easier. Actually, in my POC codes,
>>>>>> >>> I am using just one ConfigMap(e.g. "k8s-ha-app1-restserver" for
>>>>>> rest
>>>>>> >>> server component) for the leader election
>>>>>> >>> and storage. Once a JobManager win the election, it will update
>>>>>> the
>>>>>> >>> ConfigMap with leader address and periodically
>>>>>> >>> renew the lock annotation to keep as the active leader. I will
>>>>>> update
>>>>>> >>> the FLIP document, including the architecture diagram,
>>>>>> >>> to avoid the misunderstanding.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> # HA storage > Lock and release
>>>>>> >>>
>>>>>> >>> This is a valid concern. Since for Zookeeper ephemeral nodes, it
>>>>>> will be
>>>>>> >>> deleted by the ZK server automatically when
>>>>>> >>> the client is timeout. It could happen in a bad network
>>>>>> environment or
>>>>>> >>> the ZK client crashed exceptionally. For Kubernetes,
>>>>>> >>> we need to implement a similar mechanism. First, when we want to
>>>>>> lock a
>>>>>> >>> specific key in ConfigMap, we will put the owner identify,
>>>>>> >>> lease duration, renew time in the ConfigMap annotation. The
>>>>>> annotation
>>>>>> >>> will be cleaned up when releasing the lock. When
>>>>>> >>> we want to remove a job graph or checkpoints, it should satisfy
>>>>>> the
>>>>>> >>> following conditions. If not, the delete operation could not be
>>>>>> done.
>>>>>> >>> * Current instance is the owner of the key.
>>>>>> >>> * The owner annotation is empty, which means the owner has
>>>>>> released the
>>>>>> >>> lock.
>>>>>> >>> * The owner annotation timed out, which usually indicate the
>>>>>> owner died.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> # HA storage > HA data clean up
>>>>>> >>>
>>>>>> >>> Sorry for that I do not describe how the HA related ConfigMap is
>>>>>> >>> retained clearly. Benefit from the Kubernetes OwnerReference[1],
>>>>>> >>> we set owner of the flink-conf configmap, service and TaskManager
>>>>>> pods
>>>>>> >>> to JobManager Deployment. So when we want to
>>>>>> >>> destroy a Flink cluster, we just need to delete the
>>>>>> deployment[2]. For
>>>>>> >>> the HA related ConfigMaps, we do not set the owner
>>>>>> >>> so that they could be retained even though we delete the whole
>>>>>> Flink
>>>>>> >>> cluster.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> [1].
>>>>>> >>>
>>>>>> https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
>>>>>> >>> [2].
>>>>>> >>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html#stop-flink-session
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Best,
>>>>>> >>> Yang
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Stephan Ewen <se...@apache.org> 于2020年9月16日周三 下午8:16写道:
>>>>>> >>>
>>>>>> >>>> This is a very cool feature proposal.
>>>>>> >>>>
>>>>>> >>>> One lesson-learned from the ZooKeeper-based HA is that it is
>>>>>> overly
>>>>>> >>>> complicated to have the Leader RPC address in a different node
>>>>>> than the
>>>>>> >>>> LeaderLock. There is extra code needed to make sure these
>>>>>> converge and the
>>>>>> >>>> can be temporarily out of sync.
>>>>>> >>>>
>>>>>> >>>> A much easier design would be to have the RPC address as payload
>>>>>> in the
>>>>>> >>>> lock entry (ZNode in ZK), the same way that the leader fencing
>>>>>> token is
>>>>>> >>>> stored as payload of the lock.
>>>>>> >>>> I think for the design above it would mean having a single
>>>>>> ConfigMap
>>>>>> >>>> for both leader lock and leader RPC address discovery.
>>>>>> >>>>
>>>>>> >>>> This probably serves as a good design principle in general - not
>>>>>> divide
>>>>>> >>>> information that is updated together over different resources.
>>>>>> >>>>
>>>>>> >>>> Best,
>>>>>> >>>> Stephan
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>> On Wed, Sep 16, 2020 at 11:26 AM Xintong Song <
>>>>>> tonysong...@gmail.com>
>>>>>> >>>> wrote:
>>>>>> >>>>
>>>>>> >>>>> Thanks for preparing this FLIP, @Yang.
>>>>>> >>>>>
>>>>>> >>>>> In general, I'm +1 for this new feature. Leveraging Kubernetes's
>>>>>> >>>>> buildtin ConfigMap for Flink's HA services should significantly
>>>>>> reduce the
>>>>>> >>>>> maintenance overhead compared to deploying a ZK cluster. I
>>>>>> think this is an
>>>>>> >>>>> attractive feature for users.
>>>>>> >>>>>
>>>>>> >>>>> Concerning the proposed design, I have some questions. Might
>>>>>> not be
>>>>>> >>>>> problems, just trying to understand.
>>>>>> >>>>>
>>>>>> >>>>> ## Architecture
>>>>>> >>>>>
>>>>>> >>>>> Why does the leader election need two ConfigMaps (`lock for
>>>>>> contending
>>>>>> >>>>> leader`, and `leader RPC address`)? What happens if the two
>>>>>> ConfigMaps are
>>>>>> >>>>> not updated consistently? E.g., a TM learns about a new JM
>>>>>> becoming leader
>>>>>> >>>>> (lock for contending leader updated), but still gets the old
>>>>>> leader's
>>>>>> >>>>> address when trying to read `leader RPC address`?
>>>>>> >>>>>
>>>>>> >>>>> ## HA storage > Lock and release
>>>>>> >>>>>
>>>>>> >>>>> It seems to me that the owner needs to explicitly release the
>>>>>> lock so
>>>>>> >>>>> that other peers can write/remove the stored object. What if
>>>>>> the previous
>>>>>> >>>>> owner failed to release the lock (e.g., dead before releasing)?
>>>>>> Would there
>>>>>> >>>>> be any problem?
>>>>>> >>>>>
>>>>>> >>>>> ## HA storage > HA data clean up
>>>>>> >>>>>
>>>>>> >>>>> If the ConfigMap is destroyed on `kubectl delete deploy
>>>>>> <ClusterID>`,
>>>>>> >>>>> how are the HA dada retained?
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>> Thank you~
>>>>>> >>>>>
>>>>>> >>>>> Xintong Song
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>> On Tue, Sep 15, 2020 at 11:26 AM Yang Wang <
>>>>>> danrtsey...@gmail.com>
>>>>>> >>>>> wrote:
>>>>>> >>>>>
>>>>>> >>>>>> Hi devs and users,
>>>>>> >>>>>>
>>>>>> >>>>>> I would like to start the discussion about FLIP-144[1], which
>>>>>> will
>>>>>> >>>>>> introduce
>>>>>> >>>>>> a new native high availability service for Kubernetes.
>>>>>> >>>>>>
>>>>>> >>>>>> Currently, Flink has provided Zookeeper HA service and been
>>>>>> widely
>>>>>> >>>>>> used
>>>>>> >>>>>> in production environments. It could be integrated in
>>>>>> standalone
>>>>>> >>>>>> cluster,
>>>>>> >>>>>> Yarn, Kubernetes deployments. However, using the Zookeeper HA
>>>>>> in K8s
>>>>>> >>>>>> will take additional cost since we need to manage a Zookeeper
>>>>>> cluster.
>>>>>> >>>>>> In the meantime, K8s has provided some public API for leader
>>>>>> >>>>>> election[2]
>>>>>> >>>>>> and configuration storage(i.e. ConfigMap[3]). We could
>>>>>> leverage these
>>>>>> >>>>>> features and make running HA configured Flink cluster on K8s
>>>>>> more
>>>>>> >>>>>> convenient.
>>>>>> >>>>>>
>>>>>> >>>>>> Both the standalone on K8s and native K8s could benefit from
>>>>>> the new
>>>>>> >>>>>> introduced KubernetesHaService.
>>>>>> >>>>>>
>>>>>> >>>>>> [1].
>>>>>> >>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
>>>>>> >>>>>> [2].
>>>>>> >>>>>>
>>>>>> https://kubernetes.io/blog/2016/01/simple-leader-election-with-kubernetes/
>>>>>> >>>>>> [3].
>>>>>> https://kubernetes.io/docs/concepts/configuration/configmap/
>>>>>> >>>>>>
>>>>>> >>>>>> Looking forward to your feedback.
>>>>>> >>>>>>
>>>>>> >>>>>> Best,
>>>>>> >>>>>> Yang
>>>>>> >>>>>>
>>>>>> >>>>>
>>>>>>
>>>>>

Reply via email to