Hi David,

Thank you for the reply and context!

As for workload types and where native integration might fit: I think
that any k8s native solution that satisfies category 3) can also take
care of 1) and 2) while the native integration by itself can't achieve
that. Existence of [1] might serve as further indication.

The k8s operator pattern would be an essential building block for a
k8s native solution that is interoperable with k8s ecosystem tooling
like kubectl, which is why [2] and subsequent derived art were
created. Specifically the CRD allows us to directly express the
concept of a Flink application consisting of job manager and task
manager pods along with associated create/update/delete operations.

Would it make sense to gauge interest to have such an operator as part
of Flink? It appears so from discussions like [3]. I think such
addition would significantly lower the barrier to adoption, since like
you mentioned one cannot really run mission critical streaming
workloads with just the Apache Flink release binaries alone. While it
is great to have multiple k8s operators to choose from that are
managed outside Flink, it is unfortunately also evident that today's
hot operator turns into tomorrow's tech debt. I think such fate would
be less likely within the project, when multiple parties can join
forces and benefit from each other's contributions. There were similar
considerations and discussions around Docker images in the past.

Out of the features that you listed it is particularly the application
upgrade that needs to be solved through an external process like
operator. The good thing is that many folks have already thought hard
about this and in existing implementations we see different strategies
that have their merit and production mileage (certainly applies to
[2]). We could combine the best of these ideas into a unified
implementation as part of Flink itself as starting point.

Cheers,
Thomas


[1] https://github.com/wangyang0918/flink-native-k8s-operator
[2] https://github.com/lyft/flinkk8soperator
[3] https://lists.apache.org/thread/fhcr5gj1txcr0fo4s821jkp6d4tk6080


On Tue, Jan 4, 2022 at 4:04 AM David Morávek <d...@apache.org> wrote:
>
> Hi Thomas,
>
> AFAIK there are no specific plans in this direction with the native 
> integration, but I'd like to share some thoughts on the topic
>
> In my understanding there are three major groups of workloads in Flink:
>
> 1) Batch workloads
> 2) Interactive workloads (Both Batch and Streaming; eg. SQL Gateway / 
> Zeppelin Notebooks / VVP ...)
> 3) "Mission Critical" Streaming workloads
>
> I think the native integration fits really well in the first two categories. 
> Let's talk about these first:
>
> 1) Batch workloads
>
> You don't really need to address the upgrade story here. The interesting 
> topic is how to "dynamically" adjust parallelism as the workload can change 
> between stages. This is where the Adaptive Batch Scheduler [1] comes into 
> play. To leverage the scheduler to the full extend, it needs to be deployed 
> with the remote shuffle service in place [2], so the Flink's Resource Manager 
> can free TaskManagers that are no longer needed.
>
> This can IMO work really well with the native integration as there is really 
> clear approach on how the Resource Manager should decide on what resources 
> should be allocated.
>
> 2) Interactive workloads
>
> Again, the upgrade story is not really interesting in this scenario. For 
> batch workloads, it's basically the same as the above. For streaming one this 
> gets tricky. The main initiative that we current have in terms of auto 
> scaling / re-scaling of the streaming workloads is the reactive mode 
> (adaptive scheduler) [3].
>
> I can totally see how the reactive mode could be integrated in the native 
> integration, but with the application mode, which is not really suitable for 
> the interactive workloads. For integration with session cluster, we'd first 
> need to address the "scheduling" problem of how to distribute newly available 
> resources between multiple jobs.
>
> What's pretty neat here is that the interpreter (zeppelin, sql gw, ...) have 
> a really convenient way of spinning up a new cluster inside k8s.
>
> 3) "Mission Critical" Streaming workloads
>
> This one is IMO the primary reason why one would consider building a new 
> operator these days as this needs a careful lifecycle management of the 
> pipeline. I assume this is also the use case that you're investigating, am I 
> correct?
>
> I'd second the requirements that you've already stated:
> a) Resource efficiency - being able to re-scale based on the workload, in 
> order to keep up with the input / not waste resources
> b) Fast recovery
> c) Application upgrades
>
> I personally don't think that the native integration is really suitable here. 
> The direction that we're headed is with the standalone deployment on 
> Kubernetes + the reactive mode (adaptive scheduler).
>
> In theory, if we want to build a really cloud (Kubernetes) native stream 
> processor, deploying the pipeline should be as simple as deploying any other 
> application. It should be also simple to integrate with CI & CD environment 
> and the fast / frequent deploy philosophy.
>
> Let's see where we stand and where we can expand from there:
>
> a) Resource efficiency
>
> We already have the reactive mode in place. This allows you to add / remove 
> task managers by adjusting the TM deployment (`kubectl scale ...`) and Flink 
> will automatically react to the available resources. This is currently only 
> supported with the Application Mode, that is limited to a single job (which 
> should be enough for this kind of workload).
>
> The re-scaling logic is left completely up to the user and can be as simple 
> as setting up a HPA (Horizontal Pod Autoscaler). I tend to think in the 
> direction, that we might want to provide a custom k8s metrics server, that 
> allows HPA to query the metrics from JM, to make this more flexible and easy 
> to use.
>
> As this looks really great in theory, there are still some shortcomings that 
> we're actively working on addressing. For this feature to be really widely 
> adopted, we need to make the re-scaling experience as fast as possible, so we 
> can re-scale often to react to the input rate. This could be currently a 
> problem with large RocksDB states as this involves full re-balance of the 
> state (splitting / merging RocksDB instances). The k8s operator approach has 
> the same / even worse limitation as it involves taking a savepoint a 
> re-building the state from it.
>
> b) Fast recovery
>
> This is IMO not as different from the native mode (although I'd have to check 
> whether RM failover can reuse task managers). This involves frequent and fast 
> checkpointing, local recovery (which is still not supported in reactive mode, 
> but this will be hopefully addressed soon) and working directory efforts [4].
>
> c) Application upgrades
>
> This is the topic I'm still struggling with a little. Historically this 
> involves external lifecycle management (savepoint + submitting a new job). I 
> think at the end of the day, with application mode on standalone k8s, it 
> could be as simple as updating the docker image of the JM deployment.
>
> If I think about the simplest upgrade scenario, simple in-place restore from 
> the latest checkpoint, it may be fairly simple to implement. What I'm 
> struggling with are the more complex upgrade scenarios such as dual, blue / 
> green deployment.
>
>
> To sum this up, I'd really love if Flink could provide great out-of the box 
> experience with standalone mode on k8s, that makes the experience as close to 
> running / operating any other application as possible.
>
> I'd really appreciate to hear your thoughts on this topic.
>
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler
> [2] https://github.com/flink-extended/flink-remote-shuffle
> [3] 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/elastic_scaling/
> [4] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-198%3A+Working+directory+for+Flink+processes
>
> Best,
> D.
>
> On Tue, Jan 4, 2022 at 12:44 AM Thomas Weise <t...@apache.org> wrote:
>>
>> Hi,
>>
>> I was recently looking at the Flink native Kubernetes integration [1]
>> to get an idea how it relates to existing operator based solutions
>> [2], [3].
>>
>> Part of the native integration's motivations was simplicity (no extra
>> component to install), but arguably that is also a shortcoming. The
>> k8s operator model can offer support for application lifecycle like
>> upgrade and rescaling, as well as job submission without a Flink
>> client.
>>
>> When using the Flink native integration it would still be necessary to
>> provide that controller functionality. Is the idea to use the native
>> integration for task manager resource allocation in tandem with an
>> operator that provides the external controller functionality? If
>> anyone using the Flink native integration can share experience, I
>> would be curious to learn more about the specific setup and if there
>> are plans to expand the k8s native integration capabilities.
>>
>> For example:
>>
>> * Application upgrade with features such as [4]. Since the job manager
>> is part of the deployment it cannot orchestrate the deployment. It
>> needs to be the responsibility of an external process. Has anyone
>> contemplated adding such a component to Flink itself?
>>
>> * Rescaling: Theoretically a parallelism change could be performed w/o
>> restart of the job manager pod. Hence, building blocks to trigger and
>> apply rescaling could be part of Flink itself. Has this been explored
>> further?
>>
>> Yang kindly pointed me to [5]. Is the recommendation/conclusion that
>> when a k8s operator is already used, then let it be in charge of the
>> task manager resource allocation? If so, what scenario was the native
>> k8s integration originally intended for?
>>
>> Thanks,
>> Thomas
>>
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#deployment-modes
>> [2] https://github.com/lyft/flinkk8soperator
>> [3] https://github.com/spotify/flink-on-k8s-operator
>> [4] 
>> https://github.com/lyft/flinkk8soperator/blob/master/docs/state_machine.md
>> [5] https://lists.apache.org/thread/8cn99f6n8nhr07n5vqfo880tpm624s5d

Reply via email to