Re: Flink auto-scaling feature and documentation suggestions
Thank you for answering all my questions. My suggestion would be to start off with exposing an API to allow dynamically changing operator parallelism as the users of flink will be better able to decide the right scaling policy. Once this functionality is there, its just a matter of providing policies (ratio based, throughput based, back-pressure based). The web UI could be used for setting parallelism as well. An analogy would be autoscaling provided by cloud providers. The features provided are: 1. Web UI for manually overriding parallelism (min, max, desired) 2. Metric based scaling policies It will be difficult for developers to think of a reasonable value for maxParallelism for each operator and like I explained above, sometimes even a small increase in parallelism is enough to bring things down. A UI / external policy based approach will allow for quick experimentation and fine tuning. I don't think it will be possible for flink developers to build one size fits all solution. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Flink auto-scaling feature and documentation suggestions
We do exactly what you mentioned. However, it's not that simple unfortunately. Our services don't have a predictable performance as traffic varies a lot during the day. As I've explained above increase source parallelism to 2 was enough to tip over our services and reducing parallelism of the async operator to 2 is not an option. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Flink auto-scaling feature and documentation suggestions
I am using the async IO operator. The problem is that increasing source parallelism from 1 to 2 was enough to tip our systems over the edge. Reducing the parallelism of async IO operator to 2 is not an option as that would reduce the throughput quite a bit. This means that no matter what we do, we'll end up with different operators with different parallelism. What I meant with: "running all operators at such a high scale would result in wastage of resources, even with operator chaining in place." was that creating as many subtasks as that of the windowing operator for each of my operators would lead to sub-optimal performance. While chaining would ensure that all tasks would run in one slot, the partitioning of data would result in the same network IO as chaining doesn't guarantee that the same tuple is processed in 1 slot. In my experience, running operators with same parallelism of each operator is always inferior compared to hand tuned parallelism. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Flink auto-scaling feature and documentation suggestions
Yes. While back-pressure would eventually ensure high throughput, hand tuning parallelism became necessary because the job with high source parallelism would immediately bring down our internal services - not giving enough time to flink to adjust the in-rate. Plus running all operators at such a high scale would result in wastage of resources, even with operator chaining in place. That's why I think more toggles are needed to make current auto-scaling truly shine. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Flink auto-scaling feature and documentation suggestions
In one of my jobs, windowing is the costliest operation while upstream and downstream operators are not as resource intensive. There's another operator in this job that communicates with internal services. This has high parallelism as well but not as much as that of the windowing operation. Running all operators with the same parallelism as the windowing operation would choke some of our internal services we'll be consuming from our source at a rate much higher than what our internal services can handle. Thus our sources, sinks, validation, monitoring related operators have very low parallelism while one has high parallelism and another has even higher parallelism. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Flink auto-scaling feature and documentation suggestions
Forgot to add one more question - 7. If maxParallelism needs to be set to control parallelism, then wouldn't that mean that we wouldn't ever be able to take a savepoint and rescale beyond the configured maxParallelism? This would mean that we can never achieve hand tuned resource efficient. I will need to set maxParallelism beyond the current parallelism and given current tendency to allocate same number of sub-tasks for each operator, I will inevitably end up with several under utilized operators. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Flink auto-scaling feature and documentation suggestions
Some questions about adaptive scheduling documentation - "If new slots become available the job will be scaled up again, up to the configured parallelism". Does parallelism refer to maxParallelism or parallelism? I'm guessing its the latter because the doc later mentions - "In Reactive Mode (see above) the configured parallelism is ignored and treated as if it was set to infinity, letting the job always use as many resources as possible." The other question I have is along the same lines as that mentioned earlier - what strategy is used to allocate sub-tasks? Is it the ratio of parallelism that's configured or does it try to achieve as much operator chaining as possible -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Flink auto-scaling feature and documentation suggestions
Thank you very much for the new release that makes auto-scaling possible. I'm currently running multiple flink jobs and I've hand tuned the parallelism of each of the operators to achieve the best throughput. I would much rather use the auto-scaling capabilities of flink than have to hand tune my jobs but it seems there are a few gaps: 1. Setting max parallelism seems to be the only user controlled knob at the moment. As flink tries to achieve operator chaining by launching same number of sub-tasks for each operator, I'm afraid the current auto-scaling will be very inefficient. At a minimum, we need to support user provided ratios that will be used to distribute sub-tasks among operators. E.g. O1:O2 = 4:1 will mean that 4 sub-tasks of O1 should be started for each sub-task of O2. 2. Allow for external system to set parallelism of operators. Perhaps job manager's rest api can be extended to support such scaling requests 3. The doc says that local recovery doesn't work. This makes sense when a restart is due to a scaling action but I couldn't quite understand why that needs to be the case when a task manager is recovering from a crash 4. Is there any metric that allows us to distinguish between restart due to scaling as opposed to restart due to some other reason? Based on the section on limitations, there isn't but it would be good to add this as people will eventually want to monitor and alert based on restarts due to failures alone. 5. Suppose the number of containers are fixed and the job is running. Will flink internally rebalance by adding sub-tasks of one operator and removing sub-tasks of another? This could be driven by back-pressure for instance. The doc doesn't mention this so I'm assuming that current scaling is designed to maximize operator chaining. However, it does make sense to incorporate back-pressure to rebalance. Should this be how future versions of auto-scaling will work then we'll need to have some toggles to avoid restart loops. 6. How is the implementation different from taking a savepoint and manually rescaling? Are there any operator specific gotchas that we should watch out for? For instance, we use AsyncIO operator and wanted to know how inflight requests to a database would be handled when it's parallelism changes. Once again, thank you for your continued support! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Zookeeper or Kubernetes for HA?
Flink docs provide details on setting up HA but doesn't provide any recommendations as such. For jobs running in kubernetes and having a zookeeper deployment, which high availability option would be more desirable? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: [ANNOUNCE] Apache Flink 1.13.0 released
This is a very big release! Many thanks to the flink developers for their contributions to making Flink as good a framework that it is! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Dynamic configuration via broadcast state
I researched a bit more and another suggested solution is to build a custom source function that somehow waits for each operator to load it's configuration which is infact set in the open method of the source itself. I'm not sure if that's a good idea as that just exposes entire job configuration to an operator. Can we leverage watermarks/idle sources somehow? Basically set the timestamp of configuration stream to a very low number at the start and then force it to be read before data from other sources start flowing in. As configurations aren't going to change frequently we can idle these sources. 1. Is the above approach even possible? 2. Can an idle source resume once configuration changes? A rough sketch of timestamp assignment, re-activating an idle source would help! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Dynamic configuration via broadcast state
I have to make my flink job dynamically configurable and I'm thinking about using broadcast state. My current static job configuration file consists of configuration of entire set of operators which I load into a case class and then I explicitly pass the relevant configuration of each operator as its constructor parameters. Will I have to create individual broadcast streams for each operator? I.e val o1conf: BroadcastStream[O1Conf] = ... someStream.connect(o1conf).map(...) someOtherStream.connect(o2conf).flatMap(...) and so on? 1. Is there a way to just load the configuration as a whole but only pick the right subset in the connect method like so: someStream.connect(jobConfig.o1Conf).map(...) My job has several operators and it seems rather clumsy to have to instantiate 1 broadcast stream for each dynamically configurable operator. 2. Is there a way to guarantee that processElement isn't called before the first processBroadcastElement will be called? How else can we ensure that each operator always starts with valid configuration? Passing the same configuration as constructor parameters is one way to deal with it but its clumsy because that's just repetition of code. Loading configuration in open method is even worse because each operator will now have access to entire job configuration. 3. What can we do to make source and sink connectors dynamically configurable? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?
I've gone through the example as well as the documentation and I still couldn't understand whether my use case requires joining. 1. What would happen if I didn't join?2. As the 2 incoming data streams have the same type, if joining is absolutely necessary then just a union (oneStream.union(anotherStream)) followed by a keyBy should be good enough right? I am asking this because I would prefer to use the simple RichMapFunction or RichFlatMapFunction as opposed to the RichCoFlatMapFunction.Thanks a lot! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?
Let me make the example more concrete. Say O1 gets as input a data stream T1 which it splits into two using some function and produces DataStreams of type T2 and T3, each of which are partitioned by the same key function TK. Now after O2 processes a stream, it could sometimes send the stream to O3 (T4) using the same key function again. Now I want to know whether: 1. Data from streams T3 with key K and T4 with key K end up affecting the state variables for the same key K or different. I would think that would be the case but wanted a confirmation 2. An explicit join is needed or not, i.e. whether this will achieve what I want: result2 = T1.filter(fn2).keyBy(TK).map(richfn2).keyBy(TK).map(whatever O3 does) result3 = T1.filter(fn3).keyBy(TK).map(whatever O3 does) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?
Suppose i have a job with 3 operators with the following job graph: O1 => O2 // data stream partitioned by keyBy O1 => O3 // data stream partitioned by keyBy O2 => O3 // data stream partitioned by keyBy If operator O3 receives inputs from two operators and both inputs have the same type and value for a key then will the two streams end up in the same sub-task and therefore affect the same state variables keyed to that particular key? Do the streams themselves have to have the same type or is it enough that just the keys of each of the input streams have the same type and value? If they're not guaranteed to affect the same state then how can we achieve the same? I would prefer to use the simple RichMapFunction/RichFlatmapFunction for modelling my operators as opposed to any join function. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Will job manager restarts lead to repeated savepoint restoration?
I don't know how to reproduce it but what I've observed are three kinds of termination when connectivity with zookeeper is somehow disrupted. I don't think its an issue with zookeeper as it supports a much bigger kafka cluster since a few years. 1. The first kind is exactly this - https://github.com/apache/flink/pull/11338. Basically temporary loss of connectivity or rolling upgrade of zookeeper will cause job to terminate. It will restart eventually from where it left off. 2. The second kind is when job terminates and restarts for the same reason but is unable to recover from checkpoint. I think its similar to this - https://issues.apache.org/jira/browse/FLINK-19154. If upgrading to 1.12.0 (from 1.11.2) will fix the second issue then I'll upgrade. 3. The third kind is where it repeatedly restarts as its unable to establish a session with Zookeeper. I don't know if reducing session timeout will help here but in this case, I'm forced to disable zookeeper HA entirely as the job cannot even restart here. I could create a JIRA ticket for discussion zookeeper itself if you suggest but the issue of zookeeper and savepoints are related as I'm not sure what will happen in each of the above. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Challenges Deploying Flink With Savepoints On Kubernetes
Thanks for your reply! What I have seen is that the job terminates when there's intermittent loss of connectivity with zookeeper. This is in-fact the most common reason why our jobs are terminating at this point. Worse, it's unable to restore from checkpoint during some (not all) of these terminations. Under these scenarios, won't the job try to recover from a savepoint? I've gone through various tickets reporting stability issues due to zookeeper that you've mentioned you intend to resolve soon. But until the zookeeper based HA is stable, should we assume that it will repeatedly restore from savepoints? I would rather rely on kafka offsets to resume where it left off rather than savepoints. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Will job manager restarts lead to repeated savepoint restoration?
Thanks for your reply! What I have seen is that the job terminates when there's intermittent loss of connectivity with zookeeper. This is in-fact the most common reason why our jobs are terminating at this point. Worse, it's unable to restore from checkpoint during some (not all) of these terminations. Under these scenarios, won't the job try to recover from a savepoint? I've gone through various tickets reporting stability issues due to zookeeper that you've mentioned you intend to resolve soon. But until the zookeeper based HA is stable, should we assume that it will repeatedly restore from savepoints? I would rather rely on kafka offsets to resume where it left off rather than savepoints. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Challenges Deploying Flink With Savepoints On Kubernetes
I'm not sure if this addresses the original concern. For instance consider this sequence: 1. Job starts from savepoint 2. Job creates a few checkpoints 3. Job manager (just one in kubernetes) crashes and restarts with the commands specified in the kubernetes manifest which has the savepoint path Will Zookeeper based HA ensure that this savepoint path will be ignored? I've asked this and various other questions here - http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Will-job-manager-restarts-lead-to-repeated-savepoint-restoration-tp40188.html -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: state inside functions
When running in HA mode or taking savepoints, if we pass configuration as constructor arguments, then it seems as though changing configuration at a later time doesn't work as it uses state to restore older configuration. How can we pass configuration while having the flexibility to change the values at a later date? I've started another discussion with many more questions - http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-application-configuration-when-restoring-from-checkpoint-savepoint-tp40189.html -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Changing application configuration when restoring from checkpoint/savepoint
My flink job loads several configuration files that contain job, operator and business configuration. One of the operators is an AsyncOperator with function like so: class AsyncFun(config: T) extends RichAsyncFunction[X, Y] { @transient private lazy val client = f(config, metricGroup, etc.) @transient private lazy val metricGroup = ... def asyncInvoke() } The variables are declared lazily as an alternative to implementing the open method. This is un-avoidable as we're relying on flink's monitoring libraries. Application resumes from checkpoint upon unexpected termination. However, sometimes I want to change the parameter config that's passed as a constructor argument but it doesn't work as Flink tries to restore from the submittedJobGraph. This makes sense as Flink by itself doesn't know whether its recovering from an abrupt termination and must therefore rely on old config to build client or whether to start afresh. I want to know what options do we have to allow for configuration changes (i.e. re-initializing the operators): 1. Is there any way to restore from a checkpoint as well as recreate client using newer configuration? 2. If we take a savepoint (drain and save) and then resume the job, then will the configuration changes happen? 3. Will we have to move away from flink monitoring so as to initialize the client inside the constructor? 4. One option is to remove the constructor argument entirely and load config inside the open method. I want to know how this can be done without exposing the entire application configuration. I could store the configuration inside job parameters (by somehow converting this object to a map which I don't want to) but how to load it back as this operator function is used by multiple operators? 5. Any other option? For functions that aren't AsyncFunction, is leveraging BroadcastState the only way to dynamically update configuration? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Changing application configuration when restoring from checkpoint/savepoint
Will this work - In main method, serialize config into a string and store it using ParameterTool with key as taskName and value as config (serialized as string). Then in the open method, lookup the relevant configuration using getTaskName(). A follow up to this would be configuring custom windowing functions. I have a size as well as a time based window class where size and time limits are configurable and passed as constructor arguments. How to change this configuration when state persistence/recovery is enabled? A window doesn't have an open method per se -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Will job manager restarts lead to repeated savepoint restoration?
My flink job runs in kubernetes. This is the setup: 1. One job running as a job cluster with one job manager 2. HA powered by zookeeper (works fine) 3. Job/Deployment manifests stored in Github and deployed to kubernetes by Argo 4. State persisted to S3 If I were to stop (drain and take a savepoint) and resume, I'll have to update the job manager manifest with the savepoint location and save it in Github and redeploy. After deployment, I'll presumably have to modify the manifest again to remove the savepoint location so as to avoid starting the application from the same savepoint. This raises some questions: 1. If the job manager were to crash before the manifest is updated again then won't kubernetes restart the job manager from the savepoint rather than the latest checkpoint? 2. Is there a way to ensure that restoration from a savepoint doesn't happen more than once? Or not after first successful checkpoint? 3. If even one checkpoint has been finalized, then the job should prefer the checkpoint rather than the savepoint. Will that happen automatically given zookeeper? 4. Is it possible to not have to remove the savepoint path from the kubernetes manifest and simply rely on newer checkpoints/savepoints? It feels rather clumsy to have to add and remove back manually. We could use a cron job to remove it but its still clumsy. 5. Is there a way of asking flink to use the latest savepoint rather than specifying the location of the savepoint? If I were to manually rename the s3 savepoint location to something fixed (s3://fixed_savepoint_path_always) then would there be any problem restoring the job? 6. Any open source tool that solves this problem? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/