cc dev@

Hi Thomas, Hi everyone,

Thank you for starting this discussion and sorry for chiming in late.

I agree with Thomas' and David's assessment of Flink's "Native Kubernetes
Integration", in particular, it does actually not integrate well with the
Kubernetes ecosystem despite being called "native" (tooling, security
concerns).

With respect to common lifecycle management operations: these features are
not available (within Apache Flink) for any of the other resource providers
(YARN, Standalone) either. From this perspective, I wouldn't consider this
a shortcoming of the Kubernetes integration. Instead, we have been focusing
our efforts in Apache Flink on the operations of a single Job, and left
orchestration and lifecycle management that spans multiple Jobs to
ecosystem projects. I still believe that we should keep this focus on low
level composable building blocks (like Jobs and Snapshots) in Apache Flink
to make it easy for everyone to build fitting higher level abstractions
like a FlinkApplication Custom Resource on top of it. For example, we are
currently contributing multiple improvements [1,2,3,4] to the REST API and
Application Mode that in our experience will make it easier to manage
Apache Flink with a Kubernetes operator. Given this background, I suspect a
Kubernetes Operator in Apache Flink would not be a priority for us at
Ververica - at least right now.

Having said this, if others in the community have the capacity to push and
*maintain* a somewhat minimal "reference" Kubernetes Operator for Apache
Flink, I don't see any blockers. If or when this happens, I'd see some
clear benefits of using a separate repository (easier independent
versioning and releases, different build system & tooling (go, I assume)).

Looking forward to your thoughts,

Konstantin

[1] https://issues.apache.org/jira/browse/FLINK-24275
[2] https://issues.apache.org/jira/browse/FLINK-24208
[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
[4] https://issues.apache.org/jira/browse/FLINK-24113

On Mon, Jan 10, 2022 at 2:11 PM Gyula Fóra <gyf...@apache.org> wrote:

> Hi All!
>
> This is a very interesting discussion.
>
> I think many users find it confusing what deployment mode to choose when
> considering a new production application on Kubernetes. With all the
> options of native, standalone and different operators this can get tricky :)
>
> I really like the idea that Thomas brought up to have at least a minimal
> operator implementation in Flink itself to cover the most common production
> job lifecycle management scenarios. I think the Flink community has a very
> strong experience in this area to create a successful implementation that
> would benefit most production users on Kubernetes.
>
> Cheers,
> Gyula
>
> On Mon, Jan 10, 2022 at 4:29 AM Yang Wang <danrtsey...@gmail.com> wrote:
>
>> Thanks all for this fruitful discussion.
>>
>> I think Xintong has given a strong point why we introduced the native K8s
>> integration, which is active resource management.
>> I have a concrete example for this in the production. When a K8s node is
>> down, the standalone K8s deployment will take longer
>> recovery time based on the K8s eviction time(IIRC, default is 5 minutes).
>> For the native K8s integration, Flink RM could be aware of the
>> TM heartbeat lost and allocate a new one timely.
>>
>> Also when introducing the native K8s integration, another hit is that we
>> should make the users are easy enough to migrate from YARN deployment.
>> They already have a production-ready job life-cycle management system,
>> which is using Flink CLI to submit the Flink jobs.
>> So we provide a consistent command "bin/flink run-application -t
>> kubernetes-application/yarn-application" to start a Flink application and
>> "bin/flink cancel/stop ..."
>> to terminate a Flink application.
>>
>>
>> Compared with K8s operator, I know that this is not a K8s
>> native mechanism. Hence, I also agree that we still need a powerful K8s
>> operator which
>> could work with both standalone and native K8s modes. The major
>> difference between them is how to start the JM and TM pods. For standalone,
>> they are managed by K8s job/deployment. For native, maybe we could simply
>> create a submission carrying the "flink run-application" arguments
>> which is derived from the Flink application CR.
>>
>> Make the Flink's active resource manager can talk to the K8s operator is
>> an interesting option, which could support both standalone and native.
>> Then Flink RM just needs to declare the resource requirement(e.g. 2 *
>> <2G, 1CPU>, 2 * <4G, 1CPU>) and defer the resource allocation/de-allocation
>> to the K8s operator. It feels like an intermediate form between native
>> and standalone mode :)
>>
>>
>>
>> Best,
>> Yang
>>
>>
>>
>> Xintong Song <tonysong...@gmail.com> 于2022年1月7日周五 12:02写道:
>>
>>> Hi folks,
>>>
>>> Thanks for the discussion. I'd like to share my two cents on this topic.
>>>
>>> Firstly, I'd like to clarify my understanding of the concepts "native
>>> k8s integration" and "active resource management".
>>> - Native k8s integration means Flink's master interacts with k8s' api
>>> server directly. It acts like embedding an operator inside Flink's master,
>>> which manages the resources (pod, deployment, configmap, etc.) and watches
>>> / reacts to related events.
>>> - Active resource management means Flink can actively start / terminate
>>> workers as needed. Its key characteristic is that the resource a Flink
>>> deployment uses is decided by the job's execution plan, unlike the opposite
>>> reactive mode (resource available to the deployment decides the execution
>>> plan) or the standalone mode (both execution plan and deployment resources
>>> are predefined).
>>>
>>> Currently, we have the yarn and native k8s deployments (and the recently
>>> removed mesos deployment) in active mode, due to their ability to request /
>>> release worker resources from the underlying cluster. And all the existing
>>> operators, AFAIK, work with a Flink standalone deployment, where Flink
>>> cannot request / release resources by itself.
>>>
>>> From this perspective, I think a large part of the native k8s
>>> integration advantages come from the active mode: being able to better
>>> understand the job's resource requirements and adjust the deployment
>>> resource accordingly. Both fine-grained resource management (customizing TM
>>> resources for different tasks / operators) and adaptive batch scheduler
>>> (rescale the deployment w.r.t. different stages) fall into this category.
>>>
>>> I'm wondering if we can have an operator that also works with the active
>>> mode. Instead of talking to the api server directly for adding / deleting
>>> resources, Flink's active resource manager can talk to the operator (via
>>> CR) about the resources the deployment needs, and let the operator to
>>> actually add / remove the resources. The operator should be able to work
>>> with (active) or without (standalone) the information of deployment's
>>> resource requirements. In this way, users are free to choose between active
>>> and reactive (e.g., HPA) rescaling, while always benefiting from the
>>> beyond-deployment lifecycle (upgrades, savepoint management, etc.) and
>>> alignment with the K8s ecosystem (Flink client free, operating via kubectl,
>>> etc.).
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Thu, Jan 6, 2022 at 1:06 PM Thomas Weise <t...@apache.org> wrote:
>>>
>>>> Hi David,
>>>>
>>>> Thank you for the reply and context!
>>>>
>>>> As for workload types and where native integration might fit: I think
>>>> that any k8s native solution that satisfies category 3) can also take
>>>> care of 1) and 2) while the native integration by itself can't achieve
>>>> that. Existence of [1] might serve as further indication.
>>>>
>>>> The k8s operator pattern would be an essential building block for a
>>>> k8s native solution that is interoperable with k8s ecosystem tooling
>>>> like kubectl, which is why [2] and subsequent derived art were
>>>> created. Specifically the CRD allows us to directly express the
>>>> concept of a Flink application consisting of job manager and task
>>>> manager pods along with associated create/update/delete operations.
>>>>
>>>> Would it make sense to gauge interest to have such an operator as part
>>>> of Flink? It appears so from discussions like [3]. I think such
>>>> addition would significantly lower the barrier to adoption, since like
>>>> you mentioned one cannot really run mission critical streaming
>>>> workloads with just the Apache Flink release binaries alone. While it
>>>> is great to have multiple k8s operators to choose from that are
>>>> managed outside Flink, it is unfortunately also evident that today's
>>>> hot operator turns into tomorrow's tech debt. I think such fate would
>>>> be less likely within the project, when multiple parties can join
>>>> forces and benefit from each other's contributions. There were similar
>>>> considerations and discussions around Docker images in the past.
>>>>
>>>> Out of the features that you listed it is particularly the application
>>>> upgrade that needs to be solved through an external process like
>>>> operator. The good thing is that many folks have already thought hard
>>>> about this and in existing implementations we see different strategies
>>>> that have their merit and production mileage (certainly applies to
>>>> [2]). We could combine the best of these ideas into a unified
>>>> implementation as part of Flink itself as starting point.
>>>>
>>>> Cheers,
>>>> Thomas
>>>>
>>>>
>>>> [1] https://github.com/wangyang0918/flink-native-k8s-operator
>>>> [2] https://github.com/lyft/flinkk8soperator
>>>> [3] https://lists.apache.org/thread/fhcr5gj1txcr0fo4s821jkp6d4tk6080
>>>>
>>>>
>>>> On Tue, Jan 4, 2022 at 4:04 AM David Morávek <d...@apache.org> wrote:
>>>> >
>>>> > Hi Thomas,
>>>> >
>>>> > AFAIK there are no specific plans in this direction with the native
>>>> integration, but I'd like to share some thoughts on the topic
>>>> >
>>>> > In my understanding there are three major groups of workloads in
>>>> Flink:
>>>> >
>>>> > 1) Batch workloads
>>>> > 2) Interactive workloads (Both Batch and Streaming; eg. SQL Gateway /
>>>> Zeppelin Notebooks / VVP ...)
>>>> > 3) "Mission Critical" Streaming workloads
>>>> >
>>>> > I think the native integration fits really well in the first two
>>>> categories. Let's talk about these first:
>>>> >
>>>> > 1) Batch workloads
>>>> >
>>>> > You don't really need to address the upgrade story here. The
>>>> interesting topic is how to "dynamically" adjust parallelism as the
>>>> workload can change between stages. This is where the Adaptive Batch
>>>> Scheduler [1] comes into play. To leverage the scheduler to the full
>>>> extend, it needs to be deployed with the remote shuffle service in place
>>>> [2], so the Flink's Resource Manager can free TaskManagers that are no
>>>> longer needed.
>>>> >
>>>> > This can IMO work really well with the native integration as there is
>>>> really clear approach on how the Resource Manager should decide on what
>>>> resources should be allocated.
>>>> >
>>>> > 2) Interactive workloads
>>>> >
>>>> > Again, the upgrade story is not really interesting in this scenario.
>>>> For batch workloads, it's basically the same as the above. For streaming
>>>> one this gets tricky. The main initiative that we current have in terms of
>>>> auto scaling / re-scaling of the streaming workloads is the reactive mode
>>>> (adaptive scheduler) [3].
>>>> >
>>>> > I can totally see how the reactive mode could be integrated in the
>>>> native integration, but with the application mode, which is not really
>>>> suitable for the interactive workloads. For integration with session
>>>> cluster, we'd first need to address the "scheduling" problem of how to
>>>> distribute newly available resources between multiple jobs.
>>>> >
>>>> > What's pretty neat here is that the interpreter (zeppelin, sql gw,
>>>> ...) have a really convenient way of spinning up a new cluster inside k8s.
>>>> >
>>>> > 3) "Mission Critical" Streaming workloads
>>>> >
>>>> > This one is IMO the primary reason why one would consider building a
>>>> new operator these days as this needs a careful lifecycle management of the
>>>> pipeline. I assume this is also the use case that you're investigating, am
>>>> I correct?
>>>> >
>>>> > I'd second the requirements that you've already stated:
>>>> > a) Resource efficiency - being able to re-scale based on the
>>>> workload, in order to keep up with the input / not waste resources
>>>> > b) Fast recovery
>>>> > c) Application upgrades
>>>> >
>>>> > I personally don't think that the native integration is really
>>>> suitable here. The direction that we're headed is with the standalone
>>>> deployment on Kubernetes + the reactive mode (adaptive scheduler).
>>>> >
>>>> > In theory, if we want to build a really cloud (Kubernetes) native
>>>> stream processor, deploying the pipeline should be as simple as deploying
>>>> any other application. It should be also simple to integrate with CI & CD
>>>> environment and the fast / frequent deploy philosophy.
>>>> >
>>>> > Let's see where we stand and where we can expand from there:
>>>> >
>>>> > a) Resource efficiency
>>>> >
>>>> > We already have the reactive mode in place. This allows you to add /
>>>> remove task managers by adjusting the TM deployment (`kubectl scale ...`)
>>>> and Flink will automatically react to the available resources. This is
>>>> currently only supported with the Application Mode, that is limited to a
>>>> single job (which should be enough for this kind of workload).
>>>> >
>>>> > The re-scaling logic is left completely up to the user and can be as
>>>> simple as setting up a HPA (Horizontal Pod Autoscaler). I tend to think in
>>>> the direction, that we might want to provide a custom k8s metrics server,
>>>> that allows HPA to query the metrics from JM, to make this more flexible
>>>> and easy to use.
>>>> >
>>>> > As this looks really great in theory, there are still some
>>>> shortcomings that we're actively working on addressing. For this feature to
>>>> be really widely adopted, we need to make the re-scaling experience as fast
>>>> as possible, so we can re-scale often to react to the input rate. This
>>>> could be currently a problem with large RocksDB states as this involves
>>>> full re-balance of the state (splitting / merging RocksDB instances). The
>>>> k8s operator approach has the same / even worse limitation as it involves
>>>> taking a savepoint a re-building the state from it.
>>>> >
>>>> > b) Fast recovery
>>>> >
>>>> > This is IMO not as different from the native mode (although I'd have
>>>> to check whether RM failover can reuse task managers). This involves
>>>> frequent and fast checkpointing, local recovery (which is still not
>>>> supported in reactive mode, but this will be hopefully addressed soon) and
>>>> working directory efforts [4].
>>>> >
>>>> > c) Application upgrades
>>>> >
>>>> > This is the topic I'm still struggling with a little. Historically
>>>> this involves external lifecycle management (savepoint + submitting a new
>>>> job). I think at the end of the day, with application mode on standalone
>>>> k8s, it could be as simple as updating the docker image of the JM
>>>> deployment.
>>>> >
>>>> > If I think about the simplest upgrade scenario, simple in-place
>>>> restore from the latest checkpoint, it may be fairly simple to implement.
>>>> What I'm struggling with are the more complex upgrade scenarios such as
>>>> dual, blue / green deployment.
>>>> >
>>>> >
>>>> > To sum this up, I'd really love if Flink could provide great out-of
>>>> the box experience with standalone mode on k8s, that makes the experience
>>>> as close to running / operating any other application as possible.
>>>> >
>>>> > I'd really appreciate to hear your thoughts on this topic.
>>>> >
>>>> > [1]
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
>>>> > [2] https://github.com/flink-extended/flink-remote-shuffle
>>>> > [3]
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/
>>>> > [4]
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-198%3A+Working+directory+for+Flink+processes
>>>> >
>>>> > Best,
>>>> > D.
>>>> >
>>>> > On Tue, Jan 4, 2022 at 12:44 AM Thomas Weise <t...@apache.org> wrote:
>>>> >>
>>>> >> Hi,
>>>> >>
>>>> >> I was recently looking at the Flink native Kubernetes integration [1]
>>>> >> to get an idea how it relates to existing operator based solutions
>>>> >> [2], [3].
>>>> >>
>>>> >> Part of the native integration's motivations was simplicity (no extra
>>>> >> component to install), but arguably that is also a shortcoming. The
>>>> >> k8s operator model can offer support for application lifecycle like
>>>> >> upgrade and rescaling, as well as job submission without a Flink
>>>> >> client.
>>>> >>
>>>> >> When using the Flink native integration it would still be necessary
>>>> to
>>>> >> provide that controller functionality. Is the idea to use the native
>>>> >> integration for task manager resource allocation in tandem with an
>>>> >> operator that provides the external controller functionality? If
>>>> >> anyone using the Flink native integration can share experience, I
>>>> >> would be curious to learn more about the specific setup and if there
>>>> >> are plans to expand the k8s native integration capabilities.
>>>> >>
>>>> >> For example:
>>>> >>
>>>> >> * Application upgrade with features such as [4]. Since the job
>>>> manager
>>>> >> is part of the deployment it cannot orchestrate the deployment. It
>>>> >> needs to be the responsibility of an external process. Has anyone
>>>> >> contemplated adding such a component to Flink itself?
>>>> >>
>>>> >> * Rescaling: Theoretically a parallelism change could be performed
>>>> w/o
>>>> >> restart of the job manager pod. Hence, building blocks to trigger and
>>>> >> apply rescaling could be part of Flink itself. Has this been explored
>>>> >> further?
>>>> >>
>>>> >> Yang kindly pointed me to [5]. Is the recommendation/conclusion that
>>>> >> when a k8s operator is already used, then let it be in charge of the
>>>> >> task manager resource allocation? If so, what scenario was the native
>>>> >> k8s integration originally intended for?
>>>> >>
>>>> >> Thanks,
>>>> >> Thomas
>>>> >>
>>>> >> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#deployment-modes
>>>> >> [2] https://github.com/lyft/flinkk8soperator
>>>> >> [3] https://github.com/spotify/flink-on-k8s-operator
>>>> >> [4]
>>>> https://github.com/lyft/flinkk8soperator/blob/master/docs/state_machine.md
>>>> >> [5] https://lists.apache.org/thread/8cn99f6n8nhr07n5vqfo880tpm624s5d
>>>>
>>>

-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk

Reply via email to