Dear Flink community!

In our company we have implemented a system that realize the dynamic
business rules pattern. We spoke about it during Flink Forward 2019
https://www.youtube.com/watch?v=CyrQ5B0exqU.
The system is a great success and we would like to improve it. Let me
shortly mention what the system does:
* We have a Flink job with the engine that applies business rules on
multiple data streams. These rules find patterns in data, produce complex
events on these patterns.
* The engine is built on top of CoProcessFunction, the rules are
preimplemented using state and timers.
* The engine accepts control messages, that deliver configuration of the
rules, and start the instances of the rules. There might be many rule
instances with different configurations running in parallel.
* Data streams are routed to those rules, to all instances.

The *advantages* of this design are:
  * *The performance is superb. *The key to it is that we read data from
the Kafka topic once, deserialize once, shuffle it once (thankfully we have
one partitioning key) and then apply over 100 rule instances needing the
same data.
* We are able to deploy multiple rule instances dynamically without
starting/stopping the job.

Especially the performance is crucial, we have up to 500K events/s
processed by 100 of rules on less than 100 of cores. I can't imagine having
100 of Flink SQL queries each consuming these streams from Kafka on such a
cluster.

The main *painpoints *of the design is:
* to deploy new business rule kind, we need to predevelop the rule template
with use of our SDK. *We can't use* *great Flink CEP*, *Flink SQL
libraries.* Which are getting stronger every day. Flink SQL with
MATCH_RECOGNIZE would fit perfectly for our cases.
* The isolation of the rules is weak. There are many rules running per job.
One fails, the whole job fails.
* There is one set of Kafka offsets, one watermark, one checkpoint for all
the rules.
* We have one just distribution key. Although that can be overcome.

I would like to focus on solving the *first point*. We can live with the
rest.

*Question to the community*: Do you have ideas how to make it possible to
develop with use of Flink SQL with MATCH_RECOGNIZE?

My current ideas are:
1. *A possibility to dynamically modify the job topology. *
Then I imagine dynamically attaching Flink SQL jobs to the same Kafka
sources.

2. *A possibility to save data streams internally to Flink, predistributed*.
Then Flink SQL queries should be able to read these streams.

The ideal imaginary solution would look that simple in use:
CREATE TABLE my_stream(...) with (<kafka properties>,
cached = 'true')
PARTITIONED BY my_partition_key

(the cached table can also be a result of CREATE TABLE and INSERT INTO
my_stream_cached SELECT ... FROM my_stream).

then I can run multiple parallel Flink SQL queries reading from that cached
table in Flink.
These

Technical implementation: Ideally, I imagine saving events in Flink state
before they are consumed. Then implement a Flink source, that can read the
Flink state of the state-filling job. It's a different job, I know! Of
course it needs to run on the same Flink cluster.
A lot of options are possible: building on top of Flink, modifying Flink
(even keeping own fork for the time being), using an external component.

In my opinion the key to the maximized performance are:
* avoid pulling data through network from Kafka
* avoid deserialization of messages for each of queries/ processors.

Comments, ideas - Any feedback is welcome!
Thank you!
Krzysztof

P.S.   I'm writing to both dev and users groups because I suspect I would
need to modify Flink to achieve what I wrote above.

Reply via email to