I've created 3 ticket related to this discussion, feel free to comment them:
1. https://issues.apache.org/jira/browse/FLINK-17358 - JDBCTableSource support FiltertableTableSource 2. https://issues.apache.org/jira/browse/FLINK-17360 - Support custom partitioners in JDBCReadOptions 3. https://issues.apache.org/jira/browse/FLINK-17361 - Support creating of a JDBC table using a custom query Best, Flavio On Wed, Apr 22, 2020 at 4:29 PM Jingsong Li <jingsongl...@gmail.com> wrote: > > Specify "query" and "provider" > Yes, your proposal looks reasonable to me. > Key can be "scan.***" like in [1]. > > > specify parameters > Maybe we need add something like "scan.parametervalues.provider.type", it > can be "bound, specify, custom": > - when bound, using old partitionLowerBound > and partitionUpperBound, numPartitions > - when specify, using specify parameters like your proposal > - when custom, need "scan.parametervalues.provider.class" > > > not implement FiltertableTableSource > Just because we have no time to finish it. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory > > Best, > Jingsong Lee > > On Wed, Apr 22, 2020 at 9:49 PM Flavio Pompermaier <pomperma...@okkam.it> > wrote: > >> Ok, now I understand your proposal. However this looks like a workaround >> to me..I want to be able to give a name to such a table and register also >> to a catalog if I want. >> Indeed my proposal is to add a "*connector.read.query*" as an >> alternative to "connector.table" (that forces you to map tables as 1-to-1). >> Then we can add a *connector.read.parametervalues.provider.class* in >> order to customize the splitting of the query (we can also add a check that >> the query contains at least 1 question mark). >> If we introduce a custom parameters provider we need also to specify >> parameters, using something like: >> >> *'connector.read.parametervalues.0.name >> <http://connector.read.parametervalues.0.name>*' = 'minDate', >> *'connector.read.parametervalues.0.value'*= '12/10/2019' >> *'connector.read.parametervalues.1.name >> <http://connector.read.parametervalues.1.name>*' = 'maxDate', >> *'connector.read.parametervalues.1.value*'= '01/01/2020' >> >> Another question: why JDBC table source does not implement >> *FilterableTableSource?* >> >> On Wed, Apr 22, 2020 at 3:27 PM Jingsong Li <jingsongl...@gmail.com> >> wrote: >> >>> Hi, >>> >>> Requirements: read data from "SELECT public.A.x, public.B.y FROM >>> public.A JOIN public.B on public.A.pk <http://public.a.pk/> = >>> public.B.fk <http://public.b.fk/>" >>> >>> Solution: table name = "(SELECT public.A.x, public.B.y FROM public.A >>> JOIN public.B on public.A.pk <http://public.a.pk/> = public.B.fk >>> <http://public.b.fk/>)" >>> >>> I don't why there's a 1-to-1 mapping between a Flink table and a JDBC >>> table. If it is, there is no way support this requirement because this >>> flink table is come from two jdbc tables. >>> >>> Best, >>> Jingsong Lee >>> >>> On Wed, Apr 22, 2020 at 8:42 PM Flavio Pompermaier <pomperma...@okkam.it> >>> wrote: >>> >>>> Sorry Jingsong but I have to clarify this thing, which is not clear at >>>> all to me. >>>> >>>> From what I can see from the documentation of table API there's no way >>>> (currently) to associate an SQL query to a Flink Table, there's a 1-to-1 >>>> mapping between a Flink table and a JDBC table. >>>> This means that, at the moment, if I want to join 2 tables from the >>>> same JDBC source (like in the example) Flink would fetch all the data of >>>> the 2 tables and then it will do the join, it will not execute the query >>>> directly and get results back. Right? >>>> If this is the case we could open an issue in the Blink optimizer that >>>> could improve performance if the query that involves a single JDBC source >>>> is executed directly to the database. and that's one point. >>>> Or maybe this is what you were trying to say with "Which means the >>>> "select ..." is dynamically generated by the Flink sql. We can not set it >>>> static."? Does it mean that we can't specify a query in a JDBC table? >>>> This sounds to go against what you write in the statement before: So >>>> this table name can be a rich sql: "(SELECT public.A.x, public.B.y FROM >>>> public.A JOIN public.B on public.A.pk <http://public.a.pk/> = >>>> public.B.fk <http://public.b.fk/>)" >>>> >>>> I didn't understand what's your proposals here..I see two issues: >>>> >>>> 1. If a JDBC table is mapped 1-to-1 with a JDBC table, are queries >>>> pushed down in a performant way? >>>> 1. i.e. SELECT public.A.x, public.B.y FROM public.A JOIN >>>> public.B on public.A.pk <http://public.a.pk/> = public.B.fk >>>> <http://public.b.fk/> is performed efficiently to the DB or is >>>> it performed in Flink after reading all the tables data? >>>> 2. Add a way to handle custom parameter value provider class and >>>> query statements. What is exactly your proposal here? >>>> >>>> >>>> On Wed, Apr 22, 2020 at 1:03 PM Jingsong Li <jingsongl...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> You can configure table name for JDBC source. >>>>> So this table name can be a rich sql: "(SELECT public.A.x, public.B.y >>>>> FROM public.A JOIN public.B on public.A.pk <http://public.a.pk/> = >>>>> public.B.fk <http://public.b.fk/>)" >>>>> So the final scan query statement will be: "select ... from (SELECT >>>>> public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk >>>>> <http://public.a.pk/> = public.B.fk <http://public.b.fk/>) where ..." >>>>> >>>>> Why not use this rich sql to scan query statement? Because we have >>>>> implemented the project pushdown [1] in JDBCTableSource. >>>>> Which means the "select ..." is dynamically generated by the Flink >>>>> sql. We can not set it static. >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-with-projection-push-down >>>>> >>>>> Best, >>>>> Jingsong Lee >>>>> >>>>> On Wed, Apr 22, 2020 at 6:49 PM Flavio Pompermaier < >>>>> pomperma...@okkam.it> wrote: >>>>> >>>>>> Sorry Jingsong but I didn't understand your reply..Can you better >>>>>> explain the following sentences please? Probably I miss some Table API >>>>>> background here (I used only JDBOutputFormat). >>>>>> "We can not use a simple "scan.query.statement", because in >>>>>> JDBCTableSource, it also deal with project pushdown too. Which means that >>>>>> the select part can not be modified casual. >>>>>> Maybe you can configure a rich table name for this." >>>>>> >>>>>> I can take care of opening tickets but I need to understand exactly >>>>>> how many and I need to be sure of explaining the problem with the correct >>>>>> terms. >>>>>> >>>>>> Best, >>>>>> Flavio >>>>>> >>>>>> On Wed, Apr 22, 2020 at 11:52 AM Jingsong Li <jingsongl...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Thanks for the explanation. >>>>>>> You can create JIRA for this. >>>>>>> >>>>>>> For "SELECT public.A.x, public.B.y FROM public.A JOIN public.B on >>>>>>> public.A.pk <http://public.a.pk/> = public.B.fk >>>>>>> <http://public.b.fk/>. " >>>>>>> We can not use a simple "scan.query.statement", because in >>>>>>> JDBCTableSource, it also deal with project pushdown too. Which means >>>>>>> that >>>>>>> the select part can not be modified casual. >>>>>>> Maybe you can configure a rich table name for this. >>>>>>> >>>>>>> Best, >>>>>>> Jingsong Lee >>>>>>> >>>>>>> On Wed, Apr 22, 2020 at 5:24 PM Flavio Pompermaier < >>>>>>> pomperma...@okkam.it> wrote: >>>>>>> >>>>>>>> Because in my use case the parallelism was not based on a range on >>>>>>>> keys/numbers but on a range of dates, so I needed a custom Parameter >>>>>>>> Provider. >>>>>>>> For what regards pushdown I don't know how Flink/Blink currently >>>>>>>> works..for example, let's say I have a Postgres catalog containing 2 >>>>>>>> tables >>>>>>>> (public.A and public.B). >>>>>>>> If I do the following query : SELECT public.A.x, public.B.y FROM >>>>>>>> public.A JOIN public.B on public.A.pk = public.B.fk. >>>>>>>> Will this be pushdown as a single query or will Flink fetch both >>>>>>>> tables and the perform the join? >>>>>>>> Talking with Bowen I understood that to avoid this I could define a >>>>>>>> VIEW in the db (but this is not alway possible) or in Flink (but from >>>>>>>> what >>>>>>>> I know this is very costly). >>>>>>>> In this case a parameter "scan.query.statement" without a >>>>>>>> "scan.parameter.values.provider.class" is super helpful and could >>>>>>>> improve >>>>>>>> performance a lot! >>>>>>>> >>>>>>>> On Wed, Apr 22, 2020 at 11:06 AM Jingsong Li < >>>>>>>> jingsongl...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> You are right about the lower and upper, it is a must to >>>>>>>>> parallelize the fetch of the data. >>>>>>>>> And filter pushdown is used to filter more data at JDBC server. >>>>>>>>> >>>>>>>>> Yes, we can provide "scan.query.statement" and >>>>>>>>> "scan.parameter.values.provider.class" for jdbc connector. But maybe >>>>>>>>> we >>>>>>>>> need be careful about this too flexible API. >>>>>>>>> >>>>>>>>> Can you provide more about your case? Why can not been solved by >>>>>>>>> lower and upper with filter pushdown? >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Jingsong Lee >>>>>>>>> >>>>>>>>> On Wed, Apr 22, 2020 at 4:45 PM Flavio Pompermaier < >>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>> >>>>>>>>>> Maybe I am wrong but support pushdown for JDBC is one thing (that >>>>>>>>>> is probably useful) while parameters providers are required if you >>>>>>>>>> want to >>>>>>>>>> parallelize the fetch of the data. >>>>>>>>>> You are not mandated to use NumericBetweenParametersProvider, you >>>>>>>>>> can use the ParametersProvider you prefer, depending on the >>>>>>>>>> statement you >>>>>>>>>> have. >>>>>>>>>> Or do you have in mind something else? >>>>>>>>>> >>>>>>>>>> On Wed, Apr 22, 2020 at 10:33 AM Jingsong Li < >>>>>>>>>> jingsongl...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> Now in JDBCTableSource.getInputFormat, It's written explicitly: >>>>>>>>>>> WHERE XXX BETWEEN ? AND ?. So we must use >>>>>>>>>>> `NumericBetweenParametersProvider`. >>>>>>>>>>> I don't think this is a good and long-term solution. >>>>>>>>>>> I think we should support filter push-down for JDBCTableSource, >>>>>>>>>>> so in this way, we can write the filters that we want, what do you >>>>>>>>>>> think? >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Jingsong Lee >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, Apr 21, 2020 at 10:00 PM Flavio Pompermaier < >>>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi all, >>>>>>>>>>>> we have a use case where we have a prepared statement that we >>>>>>>>>>>> parameterize using a custom parameters provider (similar to what >>>>>>>>>>>> happens in >>>>>>>>>>>> testJDBCInputFormatWithParallelismAndNumericColumnSplitting[1]). >>>>>>>>>>>> How can we handle this using the JDBC table API? >>>>>>>>>>>> What should we do to handle such a use case? Is there anyone >>>>>>>>>>>> willing to mentor us in its implementation? >>>>>>>>>>>> >>>>>>>>>>>> Another question: why flink-jdbc has not been renamed to >>>>>>>>>>>> flink-connector-jdbc? >>>>>>>>>>>> >>>>>>>>>>>> Thanks in advance, >>>>>>>>>>>> Flavio >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Best, Jingsong Lee >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Best, Jingsong Lee >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best, Jingsong Lee >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Best, Jingsong Lee >>>>> >>>> >>> >>> -- >>> Best, Jingsong Lee >>> >> >> > > -- > Best, Jingsong Lee >