I agree with Dawid.

Maybe one thing to add is that reusing parts of the pipeline is possible via StatementSets in TableEnvironment. They allow you to add multiple queries that consume from a common part of the pipeline (for example a common source). But all of that is compiled into one big job and static during runtime, not isolated.

One option is to introduce an additional Flink job that multiplexes the source Kafka topic into more Kafka topics such that isolated jobs can access this intermediate storage.

I hope this helps.

Regards,
Timo

On 24.11.20 16:54, Dawid Wysakowicz wrote:
Hi,

Really sorry for a late reply.

To the best of my knowledge there is no such possibility to "attach" to a source/reader of a different job. Every job would read the source separately.

`The GenericInMemoryCatalog is an in-memory
implementation of a catalog. All objects will be available only for the
lifetime of the session.`. I presume, in session mode, we can share Kafka
source for multiple SQL jobs?

Unfortunately this is wrong assumption. Catalogs store "metadata of Tables, such as connetion parameters, schema etc. Not the data itself, or parts of the graph. The information from a catalog can be used to create an execution graph that can be submitted to a cluster. It has nothing to do with a session cluster. The session here means a job/the lifetime of the GenericInMemoryCatalog.

Both queries will share the same reader as they are part of a single job
graph. Can we somehow take a snapshot of this and submit another query with
them again under the same job graph?

Again unfortunately there is no guarantees this will work. As of now it is a limitation of SQL that it does not support stateful upgrades of a Graph or Flink version. As Till said, if the plan will contain the same sub plans they should be able to match. However with such an extensive changes to the graph I would not count that it happens. It can work for rather simpler changes such as e.g. changing a predicate (but still it can greatly affect the plan if the predicate could've been optimized). There were and there are some discussions going on to improve the situation here.

A proper solution for the problem for a STREAMING job would be rather hard in my opinion. As we would have to somehow keep the state of the shared source between multiple different jobs. We would need to know e.g. the offsets that a certain job consumed up to a certain checkpoint. What to do if e.g. a particular query requests to start reading from offsets in the past etc.

There is some ongoing effort to support caching a queries that could be reused between jobs in the same cluster as a better support for Interactive programming[1], but I don't think it will support a STREAMING mode.

Just as a side. I am not a Spark expert and I might be completely wrong, but as far as I am familiar with Spark, it also does not support dynamically reusing streaming sources. It does have the caching of intermediate shuffles implemented, something that the FLIP-36 resembles.

Best regards,

Dawid

[1] https://cwiki.apache.org/confluence/x/8hclBg

On 23/11/2020 21:09, lalala wrote:
Hi Till,

Thank you for your comment. I am looking forward to hearing from Timo and
Dawid as well.

Best regards,



--
Sent from:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to