Hi Jark, thanks for joining the discussion!
I understand your point of view that SQL environment is probably not the
best for what I was looking to achieve.
The idea of a configuration tool sounds almost perfect :) Almost , because:
Without the "StatementSet" that you mentioned at the end I would be worried
about resource consumption (job & task manager objects, buffers,
connections) of having one topology per table. That would be a significant
loss against architecture of Kafka Connect kind.
With StatementSet I understand this is not a case, but there is another
issue: We lose the dynamism. That is, the job won't be able to discover new
tables. We would need to always restart the whole (reconfigured)
StatementSet job. (Anyway, this approach sounds good enough to try it out
in my current assignment.)
The other issue I see is that I still need to define the DSL for the
configuration(sth like config of KConnect). SQL will not be it, it will
probably be barely a way to implement the tool.

I would appreciate your comments, Jark.
Also if anyone would like to add other ideas, feel welcome!

Best,
Krzysztof

śr., 4 lis 2020 o 09:37 Jark Wu <imj...@gmail.com> napisał(a):

> Hi Krzysztof,
>
> This is a very interesting idea.
>
> I think SQL is not a suitable tool for this use case, because SQL is a
> structured query language
>  where the table schema is fixed and never changes during job running.
>
> However, I think it can be a configuration tool project on top of Flink
> SQL.
> The configuration tool can dynamically generate all the queries according
> to the config
>  and submit them in one job.
>
> For example, if the configuration says "synchronize from mysql address
> 'xxxx' to kafka broker 'yyyy'",
> then the generated Flink SQL would like:
>
> CREATE TABLE db (
>   `database_name` STRING,
>   `table_name` STRING,
>   `data` BYTES  // encodes all the columns value, can be a better
> structure for performance
> ) WITH (
>   connector = ...   // a new connector scan all tables from the mysql
> address
>   url = 'jdbc:mysql://localhost:3306/flink-test'
> );
>
> // the configuration tool will generate multiple INSERT INTO according to
> how many tables in the DB
> INSERT INTO kafka_table1
> SELECT parse_data(table_name, data)   // the parse_data UDF will infer
> schema from database
> FROM db WHERE table = 'table1'            // or schema registry and
> deserialize the data into columns with different types.
>
> INSERT INTO kafka_table2
> SELECT parse_data(table_name, data)
> FROM db WHERE table = 'table2'
>
> ...
>
> The configuration tool can use `StatementSet` to package all the INSERT
> INTO queries together and submit them in one job.
> With the `StatementSet`, the job will share the common source task, so the
> tables in MySQL are only read once.
>
> Best,
> Jark
>
>
>
>
>
>
>
>
>
>
> On Tue, 3 Nov 2020 at 14:00, Krzysztof Zarzycki <k.zarzy...@gmail.com>
> wrote:
>
>> Hi community, I would like to confront one idea with you.
>>
>> I was thinking that Flink SQL could be a Flink's answer for Kafka Connect
>> (more powerful, with advantages like being decoupled from Kafka). Flink SQL
>> would be the configuration language for Flink "connectors", sounds great!.
>> But one thing does not allow me to implement this idea: There is no
>> possibility to run SQL-based processing over multiple similar inputs and
>> produce multiple similar outputs (counted in tens or hundreds).
>> As a problem example that I need to solve, consider that I have a hundred
>> of Kafka topics, with similar data in each. And I would like to sink them
>> to a SQL database. With Kafka connect, I can use a single connector with
>> JDBC sink, that properly configured will dump each topic to a separate
>> table properly keeping the schema (based on what is in the schema
>> registry).
>> With Flink SQL I would need to run a query per topic/table, I believe.
>> Similarly with sourcing data. There is this cool project
>> flink-cdc-connectors [1] that leverages Debezium in Flink to apply CDC on
>> SQL database, but when used with SQL, it can only pull in one table per
>> query.
>> These cases can be solved using the datastream API. With it I can code
>> pulling in/pushing out multiple table streams. But then "the configuration"
>> is a much bigger effort, because it requires using java code. And that is a
>> few hours vs few days case, an enormous difference.
>>
>> So in the end some questions:
>> * Do you know how SQL could be extended to support handling such cases
>> elegantly, with a single job in the end?
>> * Or do you believe SQL should not be used for that case and we should
>> come up with a different tool and configuration language? I.e. sth like
>> Kafka Connect
>> * Do you know of any other project that implements this idea?
>>
>> I definitely believe that this is a great use case for Flink to be an
>> easy-to-use ingress from/egress to Kafka/HDFS/whatever system, therefore
>> there is a need for a solution for my case.
>>
>> Thanks for answering!
>> Krzysztof
>>
>> [1] https://github.com/ververica/flink-cdc-connectors
>>
>

Reply via email to