Re: Flink auto-scaling feature and documentation suggestions

2021-05-06 Thread vishalovercome
Thank you for answering all my questions. My suggestion would be to start off
with exposing an API to allow dynamically changing operator parallelism as
the users of flink will be better able to decide the right scaling policy.
Once this functionality is there, its just a matter of providing policies
(ratio based, throughput based, back-pressure based). The web UI could be
used for setting parallelism as well. 

An analogy would be autoscaling provided by cloud providers. The features
provided are:
1. Web UI for manually overriding parallelism (min, max, desired)
2. Metric based scaling policies

It will be difficult for developers to think of a reasonable value for
maxParallelism for each operator and like I explained above, sometimes even
a small increase in parallelism is enough to bring things down. A UI /
external policy based approach will allow for quick experimentation and fine
tuning. I don't think it will be possible for flink developers to build one
size fits all solution.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink auto-scaling feature and documentation suggestions

2021-05-06 Thread vishalovercome
We do exactly what you mentioned. However, it's not that simple
unfortunately. Our services don't have a predictable performance as traffic
varies a lot during the day. 

As I've explained above increase source parallelism to 2 was enough to tip
over our services and reducing parallelism of the async operator to 2 is not
an option. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink auto-scaling feature and documentation suggestions

2021-05-06 Thread vishalovercome
I am using the async IO operator. The problem is that increasing source
parallelism from 1 to 2 was enough to tip our systems over the edge.
Reducing the parallelism of async IO operator to 2 is not an option as that
would reduce the throughput quite a bit. This means that no matter what we
do, we'll end up with different operators with different parallelism. 

What I meant with: "running all operators at such a high scale would result
in wastage of resources, even with operator chaining in place." was that
creating as many subtasks as that of the windowing operator for each of my
operators would lead to sub-optimal performance. While chaining would ensure
that all tasks would run in one slot, the partitioning of data would result
in the same network IO as chaining doesn't guarantee that the same tuple is
processed in 1 slot. 

In my experience, running operators with same parallelism of each operator
is always inferior compared to hand tuned parallelism.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink auto-scaling feature and documentation suggestions

2021-05-05 Thread vishalovercome
Yes. While back-pressure would eventually ensure high throughput, hand tuning
parallelism became necessary because the job with high source parallelism
would immediately bring down our internal services - not giving enough time
to flink to adjust the in-rate. Plus running all operators at such a high
scale would result in wastage of resources, even with operator chaining in
place. 

That's why I think more toggles are needed to make current auto-scaling
truly shine.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink auto-scaling feature and documentation suggestions

2021-05-04 Thread vishalovercome
In one of my jobs, windowing is the costliest operation while upstream and
downstream operators are not as resource intensive. There's another operator
in this job that communicates with internal services. This has high
parallelism as well but not as much as that of the windowing operation.
Running all operators with the same parallelism as the windowing operation
would choke some of our internal services we'll be consuming from our source
at a rate much higher than what our internal services can handle. Thus our
sources, sinks, validation, monitoring related operators have very low
parallelism while one has high parallelism and another has even higher
parallelism. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink auto-scaling feature and documentation suggestions

2021-05-04 Thread vishalovercome
Forgot to add one more question - 7. If maxParallelism needs to be set to
control parallelism, then wouldn't that mean that we wouldn't ever be able
to take a savepoint and rescale beyond the configured maxParallelism? This
would mean that we can never achieve hand tuned resource efficient. I will
need to set maxParallelism beyond the current parallelism and given current
tendency to allocate same number of sub-tasks for each operator, I will
inevitably end up with several under utilized operators.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink auto-scaling feature and documentation suggestions

2021-05-04 Thread vishalovercome
Some questions about adaptive scheduling documentation - "If new slots become
available the job will be scaled up again, up to the configured
parallelism". 

Does parallelism refer to maxParallelism or parallelism? I'm guessing its
the latter because the doc later mentions - "In Reactive Mode (see above)
the configured parallelism is ignored and treated as if it was set to
infinity, letting the job always use as many resources as possible." 

The other question I have is along the same lines as that mentioned earlier
- what strategy is used to allocate sub-tasks? Is it the ratio of
parallelism that's configured or does it try to achieve as much operator
chaining as possible



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink auto-scaling feature and documentation suggestions

2021-05-04 Thread vishalovercome
Thank you very much for the new release that makes auto-scaling possible. I'm
currently running multiple flink jobs and I've hand tuned the parallelism of
each of the operators to achieve the best throughput. I would much rather
use the auto-scaling capabilities of flink than have to hand tune my jobs
but it seems there are a few gaps:

1. Setting max parallelism seems to be the only user controlled knob at the
moment. As flink tries to achieve operator chaining by launching same number
of sub-tasks for each operator, I'm afraid the current auto-scaling will be
very inefficient. At a minimum, we need to support user provided ratios that
will be used to distribute sub-tasks among operators. E.g. O1:O2 = 4:1 will
mean that 4 sub-tasks of O1 should be started for each sub-task of O2. 

2. Allow for external system to set parallelism of operators. Perhaps job
manager's rest api can be extended to support such scaling requests

3. The doc says that local recovery doesn't work. This makes sense when a
restart is due to a scaling action but I couldn't quite understand why that
needs to be the case when a task manager is recovering from a crash

4. Is there any metric that allows us to distinguish between restart due to
scaling as opposed to restart due to some other reason? Based on the section
on limitations, there isn't but it would be good to add this as people will
eventually want to monitor and alert based on restarts due to failures
alone.

5. Suppose the number of containers are fixed and the job is running. Will
flink internally rebalance by adding sub-tasks of one operator and removing
sub-tasks of another? This could be driven by back-pressure for instance.
The doc doesn't mention this so I'm assuming that current scaling is
designed to maximize operator chaining. However, it does make sense to
incorporate back-pressure to rebalance. Should this be how future versions
of auto-scaling will work then we'll need to have some toggles to avoid
restart loops.

6. How is the implementation different from taking a savepoint and manually
rescaling? Are there any operator specific gotchas that we should watch out
for? For instance, we use AsyncIO operator and wanted to know how inflight
requests to a database would be handled when it's parallelism changes.

Once again, thank you for your continued support!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Zookeeper or Kubernetes for HA?

2021-05-03 Thread vishalovercome
Flink docs provide details on setting up HA but doesn't provide any
recommendations as such. For jobs running in kubernetes and having a
zookeeper deployment, which high availability option would be more
desirable? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [ANNOUNCE] Apache Flink 1.13.0 released

2021-05-03 Thread vishalovercome
This is a very big release! Many thanks to the flink developers for their
contributions to making Flink as good a framework that it is!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Dynamic configuration via broadcast state

2021-04-06 Thread vishalovercome
I researched a bit more and another suggested solution is to build a custom
source function that somehow waits for each operator to load it's
configuration which is infact set in the open method of the source itself.
I'm not sure if that's a good idea as that just exposes entire job
configuration to an operator. 

Can we leverage watermarks/idle sources somehow? Basically set the timestamp
of configuration stream to a very low number at the start and then force it
to be read before data from other sources start flowing in. As
configurations aren't going to change frequently we can idle these sources.

1. Is the above approach even possible?
2. Can an idle source resume once configuration changes? 

A rough sketch of timestamp assignment, re-activating an idle source would
help!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Dynamic configuration via broadcast state

2021-04-06 Thread vishalovercome
I have to make my flink job dynamically configurable and I'm thinking about
using broadcast state. My current static job configuration file consists of
configuration of entire set of operators which I load into a case class and
then I explicitly pass the relevant configuration of each operator as its
constructor parameters. Will I have to create individual broadcast streams
for each operator? I.e

val o1conf: BroadcastStream[O1Conf] = ...
someStream.connect(o1conf).map(...)

someOtherStream.connect(o2conf).flatMap(...) and so on?

1. Is there a way to just load the configuration as a whole but only pick
the right subset in the connect method like so:

someStream.connect(jobConfig.o1Conf).map(...)

My job has several operators and it seems rather clumsy to have to
instantiate 1 broadcast stream for each dynamically configurable operator.

2. Is there a way to guarantee that processElement isn't called before the
first processBroadcastElement will be called? How else can we ensure that
each operator always starts with valid configuration? Passing the same
configuration as constructor parameters is one way to deal with it but its
clumsy because that's just repetition of code. Loading configuration in open
method is even worse because each operator will now have access to entire
job configuration.

3. What can we do to make source and sink connectors dynamically
configurable?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

2021-03-29 Thread vishalovercome
I've gone through the example as well as the documentation and I still
couldn't understand whether my use case requires joining. 1. What would
happen if I didn't join?2. As the 2 incoming data streams have the same
type, if joining is absolutely necessary then just a union
(oneStream.union(anotherStream)) followed by a keyBy should be good enough
right? I am asking this because I would prefer to use the simple
RichMapFunction or RichFlatMapFunction as opposed to the
RichCoFlatMapFunction.Thanks a lot!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

2021-03-24 Thread vishalovercome
Let me make the example more concrete. Say O1 gets as input a data stream T1
which it splits into two using some function and produces DataStreams of
type T2 and T3, each of which are partitioned by the same key function TK.
Now after O2 processes a stream, it could sometimes send the stream to O3
(T4) using the same key function again. Now I want to know whether: 

1. Data from streams T3 with key K and T4 with key K end up affecting the
state variables for the same key K or different. I would think that would be
the case but wanted a confirmation
2. An explicit join is needed or not, i.e. whether this will achieve what I
want:

result2 = T1.filter(fn2).keyBy(TK).map(richfn2).keyBy(TK).map(whatever O3
does)
result3 = T1.filter(fn3).keyBy(TK).map(whatever O3 does)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Do data streams partitioned by same keyBy but from different operators converge to the same sub-task?

2021-03-23 Thread vishalovercome
Suppose i have a job with 3 operators with the following job graph:

O1 => O2 // data stream partitioned by keyBy
O1 => O3 // data stream partitioned by keyBy
O2 => O3 // data stream partitioned by keyBy 

If operator O3 receives inputs from two operators and both inputs have the
same type and value for a key then will the two streams end up in the same
sub-task and therefore affect the same state variables keyed to that
particular key? Do the streams themselves have to have the same type or is
it enough that just the keys of each of the input streams have the same type
and value? 

If they're not guaranteed to affect the same state then how can we achieve
the same? I would prefer to use the simple
RichMapFunction/RichFlatmapFunction for modelling my operators as opposed to
any join function.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Will job manager restarts lead to repeated savepoint restoration?

2020-12-21 Thread vishalovercome
I don't know how to reproduce it but what I've observed are three kinds of
termination when connectivity with zookeeper is somehow disrupted. I don't
think its an issue with zookeeper as it supports a much bigger kafka cluster
since a few years. 

1. The first kind is exactly this -
https://github.com/apache/flink/pull/11338. Basically temporary loss of
connectivity or rolling upgrade of zookeeper will cause job to terminate. It
will restart eventually from where it left off.
2. The second kind is when job terminates and restarts for the same reason
but is unable to recover from checkpoint. I think its similar to this -
https://issues.apache.org/jira/browse/FLINK-19154. If upgrading to 1.12.0
(from 1.11.2) will fix the second issue then I'll upgrade. 
3. The third kind is where it repeatedly restarts as its unable to establish
a session with Zookeeper. I don't know if reducing session timeout will help
here but in this case, I'm forced to disable zookeeper HA entirely as the
job cannot even restart here. 

I could create a JIRA ticket for discussion zookeeper itself if you suggest
but the issue of zookeeper and savepoints are related as I'm not sure what
will happen in each of the above.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Challenges Deploying Flink With Savepoints On Kubernetes

2020-12-21 Thread vishalovercome
Thanks for your reply!

What I have seen is that the job terminates when there's intermittent loss
of connectivity with zookeeper. This is in-fact the most common reason why
our jobs are terminating at this point. Worse, it's unable to restore from
checkpoint during some (not all) of these terminations. Under these
scenarios, won't the job try to recover from a savepoint?

I've gone through various tickets reporting stability issues due to
zookeeper that you've mentioned you intend to resolve soon. But until the
zookeeper based HA is stable, should we assume that it will repeatedly
restore from savepoints? I would rather rely on kafka offsets to resume
where it left off rather than savepoints.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Will job manager restarts lead to repeated savepoint restoration?

2020-12-21 Thread vishalovercome
Thanks for your reply!

What I have seen is that the job terminates when there's intermittent loss
of connectivity with zookeeper. This is in-fact the most common reason why
our jobs are terminating at this point. Worse, it's unable to restore from
checkpoint during some (not all) of these terminations. Under these
scenarios, won't the job try to recover from a savepoint? 

I've gone through various tickets reporting stability issues due to
zookeeper that you've mentioned you intend to resolve soon. But until the
zookeeper based HA is stable, should we assume that it will repeatedly
restore from savepoints? I would rather rely on kafka offsets to resume
where it left off rather than savepoints.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Challenges Deploying Flink With Savepoints On Kubernetes

2020-12-16 Thread vishalovercome
I'm not sure if this addresses the original concern. For instance consider
this sequence:

1. Job starts from savepoint
2. Job creates a few checkpoints
3. Job manager (just one in kubernetes) crashes and restarts with the
commands specified in the kubernetes manifest which has the savepoint path

Will Zookeeper based HA ensure that this savepoint path will be ignored? 

I've asked this and various other questions here -
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Will-job-manager-restarts-lead-to-repeated-savepoint-restoration-tp40188.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: state inside functions

2020-12-16 Thread vishalovercome
When running in HA mode or taking savepoints, if we pass configuration as
constructor arguments, then it seems as though changing configuration at a
later time doesn't work as it uses state to restore older configuration. How
can we pass configuration while having the flexibility to change the values
at a later date?

I've started another discussion with many more questions -
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-application-configuration-when-restoring-from-checkpoint-savepoint-tp40189.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Changing application configuration when restoring from checkpoint/savepoint

2020-12-16 Thread vishalovercome
My flink job loads several configuration files that contain job, operator and
business configuration. One of the operators is an AsyncOperator with
function like so:

class AsyncFun(config: T) extends RichAsyncFunction[X, Y] {
   @transient private lazy val client = f(config, metricGroup, etc.)
   @transient private lazy val metricGroup = ...
   
   def asyncInvoke()
}

The variables are declared lazily as an alternative to implementing the open
method. This is un-avoidable as we're relying on flink's monitoring
libraries. Application resumes from checkpoint upon unexpected termination.
However, sometimes I want to change the parameter config that's passed as a
constructor argument but it doesn't work as Flink tries to restore from the
submittedJobGraph. This makes sense as Flink by itself doesn't know whether
its recovering from an abrupt termination and must therefore rely on old
config to build client or whether to start afresh. I want to know what
options do we have to allow for configuration changes (i.e. re-initializing
the operators):

1. Is there any way to restore from a checkpoint as well as recreate client
using newer configuration?
2. If we take a savepoint (drain and save) and then resume the job, then
will the configuration changes happen?
3. Will we have to move away from flink monitoring so as to initialize the
client inside the constructor?
4. One option is to remove the constructor argument entirely and load config
inside the open method. I want to know how this can be done without exposing
the entire application configuration. I could store the configuration inside
job parameters (by somehow converting this object to a map which I don't
want to) but how to load it back as this operator function is used by
multiple operators?
5. Any other option?

For functions that aren't AsyncFunction, is leveraging BroadcastState the
only way to dynamically update configuration?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Changing application configuration when restoring from checkpoint/savepoint

2020-12-16 Thread vishalovercome
Will this work - In main method, serialize config into a string and store it
using ParameterTool with key as taskName and value as config (serialized as
string). Then in the open method, lookup the relevant configuration using
getTaskName().

A follow up to this would be configuring custom windowing functions. I have
a size as well as a time based window class where size and time limits are
configurable and passed as constructor arguments. How to change this
configuration when state persistence/recovery is enabled? A window doesn't
have an open method per se





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Will job manager restarts lead to repeated savepoint restoration?

2020-12-16 Thread vishalovercome
My flink job runs in kubernetes. This is the setup:

1. One job running as a job cluster with one job manager
2. HA powered by zookeeper (works fine)
3. Job/Deployment manifests stored in Github and deployed to kubernetes by
Argo
4. State persisted to S3

If I were to stop (drain and take a savepoint) and resume, I'll have to
update the job manager manifest with the savepoint location and save it in
Github and redeploy. After deployment, I'll presumably have to modify the
manifest again to remove the savepoint location so as to avoid starting the
application from the same savepoint. This raises some questions:

1. If the job manager were to crash before the manifest is updated again
then won't kubernetes restart the job manager from the savepoint rather than
the latest checkpoint?
2. Is there a way to ensure that restoration from a savepoint doesn't happen
more than once? Or not after first successful checkpoint?
3. If even one checkpoint has been finalized, then the job should prefer the
checkpoint rather than the savepoint. Will that happen automatically given
zookeeper?
4. Is it possible to not have to remove the savepoint path from the
kubernetes manifest and simply rely on newer checkpoints/savepoints? It
feels rather clumsy to have to add and remove back manually. We could use a
cron job to remove it but its still clumsy.
5. Is there a way of asking flink to use the latest savepoint rather than
specifying the location of the savepoint? If I were to manually rename the
s3 savepoint location to something fixed (s3://fixed_savepoint_path_always)
then would there be any problem restoring the job?
6. Any open source tool that solves this problem?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/