Hi Krzysiek,
the idea is quite interesting - although maintaining some coordination
to be able to handle checkpoints would probably pretty tricky. Did you
figure out how to handle proper distribution of tasks between TMs? As
far as I understand you have to guarantee that all sources reading from
cache are on the same TM as sinks writing data from Kafka? Or you think
about some distributed caches?
As for your original question - we are also looking for solutions/ideas
for this problem in Nussknacker. We have similar problem, however we had
different constraints (on premise, not have to care too much about
bandwidth) and we went with "one job per scenario". It works ok, but the
biggest problem for me is that it does not scale with the number of
jobs: Flink job is quite heavy entity - all the threads, classloaders
etc. Having more than a few dozens of jobs is also not so easy to handle
on JobManager part - especially when it's restarted etc. I guess your
idea would also suffer from this problem?
thanks,
maciek
On 27/03/2020 10:18, Krzysztof Zarzycki wrote:
I want to do a bit different hacky PoC:
* I will write a sink, that caches the results in "JVM global" memory.
Then I will write a source, that reads this cache.
* I will launch one job, that reads from Kafka source, shuffles the
data to the desired partitioning and then sinks to that cache.
* Then I will lunch multiple jobs (Datastream based or Flink SQL
based) , that uses the source from cache to read the data out and then
reinterprets it as keyed stream [1].
* Using JVM global memory is necessary, because AFAIK the jobs use
different classloaders. The class of cached object also needs to be
available in the parent classloader i.e. in the cluster's classpath.
This is just to prove the idea, the performance and usefulness of it.
All the problems of checkpointing this data I will leave for later.
I'm very very interested in your, community, comments about this idea
and later productization of it.
Thanks!
Answering your comments:
Unless you need reprocessing for newly added rules, I'd probably
just cancel with savepoint and restart the application with the
new rules. Of course, it depends on the rules themselves and how
much state they require if a restart is viable. That's up to a POC.
No, I don't need reprocessing (yet). The rule starts processing the
data from the moment it is defined.
The cancellation with savepoint was considered, but because the number
of new rules defined/changed daily might be large enough, that will
generate too much of downtime. There is a lot of state kept in those
rules making the restart heavy. What's worse, that would be
cross-tenant downtime, unless the job was somehow per team/tenant.
Therefore we reject this option.
BTW, the current design of our system is similar to the one from the
blog series by Alexander Fedulov about dynamic rules pattern [2] he's
just publishing.
They will consume the same high intensive source(s) therefore
I want to optimize for that by consuming the messages in Flink
only once.
That's why I proposed to run one big query instead of 500 small
ones. Have a POC where you add two of your rules manually to a
Table and see how the optimized logical plan looks like. I'd bet
that the source is only tapped once.
I can do that PoC, no problem. But AFAIK it will only work with the
"restart with savepoint" pattern discussed above.
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
[2] https://flink.apache.org/news/2020/03/24/demo-fraud-detection-2.html
On Wed, Mar 25, 2020 at 6:15 PM Krzysztof Zarzycki
<k.zarzy...@gmail.com <mailto:k.zarzy...@gmail.com>> wrote:
Hello Arvid,
Thanks for joining to the thread!
First, did you take into consideration that I would like to
dynamically add queries on the same source? That means first
define one query, later the day add another one , then another
one, and so on. A Week later kill one of those, start yet
another one, etc... There will be hundreds of these queries
running at once, but the set of queries change several times a
day.
They will consume the same high intensive source(s) therefore
I want to optimize for that by consuming the messages in Flink
only once.
Regarding the temporary tables AFAIK they are only the
metadata (let's say Kafka topic detals) and store it in the
scope of a SQL session. Therefore multiple queries against
that temp table will behave the same way as querying normal
table, that is will read the datasource multiple times.
It looks like the feature I want or could use is defined by
the way of FLIP-36 about Interactive Programming, more
precisely caching the stream table [1].
While I wouldn't like to limit the discussion to that
non-existing yet feature. Maybe there are other ways of
achieving this danymic querying capability.
Kind Regards,
Krzysztof
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink#FLIP-36:SupportInteractiveProgramminginFlink-Cacheastreamtable
* You want to use primary Table API as that allows you to
programmatically introduce structural variance (changing
rules).
* You start by registering the source as temporary table.
* Then you add your rules as SQL through
`TableEnvironment#sqlQuery`.
* Lastly you unionAll the results.
Then I'd perform some experiment if indeed the optimizer
figured out that it needs to only read the source once.
The resulting code would be minimal and easy to maintain.
If the performance is not satisfying, you can always make
it more complicated.
Best,
Arvid
On Mon, Mar 23, 2020 at 7:02 PM Krzysztof Zarzycki
<k.zarzy...@gmail.com <mailto:k.zarzy...@gmail.com>> wrote:
Dear Flink community!
In our company we have implemented a system that
realize the dynamic business rules pattern. We spoke
about it during Flink Forward 2019
https://www.youtube.com/watch?v=CyrQ5B0exqU.
The system is a great success and we would like to
improve it. Let me shortly mention what the system does:
* We have a Flink job with the engine that applies
business rules on multiple data streams. These rules
find patterns in data, produce complex events on these
patterns.
* The engine is built on top of CoProcessFunction, the
rules are preimplemented using state and timers.
* The engine accepts control messages, that deliver
configuration of the rules, and start the instances of
the rules. There might be many rule instances with
different configurations running in parallel.
* Data streams are routed to those rules, to all
instances.
The *advantages* of this design are:
* *The performance is superb. *The key to it is
that we read data from the Kafka topic once,
deserialize once, shuffle it once (thankfully we have
one partitioning key) and then apply over 100 rule
instances needing the same data.
* We are able to deploy multiple rule instances
dynamically without starting/stopping the job.
Especially the performance is crucial, we have up to
500K events/s processed by 100 of rules on less than
100 of cores. I can't imagine having 100 of Flink SQL
queries each consuming these streams from Kafka on
such a cluster.
The main *painpoints *of the design is:
* to deploy new business rule kind, we need to
predevelop the rule template with use of our SDK. *We
can't use* *great Flink CEP*, *Flink SQL
libraries.* Which are getting stronger every day.
Flink SQL with MATCH_RECOGNIZE would fit perfectly for
our cases.
* The isolation of the rules is weak. There are many
rules running per job. One fails, the whole job fails.
* There is one set of Kafka offsets, one watermark,
one checkpoint for all the rules.
* We have one just distribution key. Although that can
be overcome.
I would like to focus on solving the *first point*. We
can live with the rest.
*Question to the community*: Do you have ideas how to
make it possible to develop with use of Flink SQL with
MATCH_RECOGNIZE?
My current ideas are:
1. *A possibility to dynamically modify the job
topology. *
Then I imagine dynamically attaching Flink SQL jobs to
the same Kafka sources.
2. *A possibility to save data streams internally to
Flink, predistributed*. Then Flink SQL queries should
be able to read these streams.
The ideal imaginary solution would look that simple in
use:
CREATE TABLE my_stream(...) with (<kafka properties>,
cached = 'true')
PARTITIONED BY my_partition_key
(the cached table can also be a result of CREATE TABLE
and INSERT INTO my_stream_cached SELECT ... FROM
my_stream).
then I can run multiple parallel Flink SQL queries
reading from that cached table in Flink.
These
Technical implementation: Ideally, I imagine saving
events in Flink state before they are consumed. Then
implement a Flink source, that can read the Flink
state of the state-filling job. It's a different job,
I know! Of course it needs to run on the same Flink
cluster.
A lot of options are possible: building on top of
Flink, modifying Flink (even keeping own fork for the
time being), using an external component.
In my opinion the key to the maximized performance are:
* avoid pulling data through network from Kafka
* avoid deserialization of messages for each of
queries/ processors.
Comments, ideas - Any feedback is welcome!
Thank you!
Krzysztof
P.S. I'm writing to both dev and users groups
because I suspect I would need to modify Flink to
achieve what I wrote above.