> Specify "query" and "provider" Yes, your proposal looks reasonable to me. Key can be "scan.***" like in [1].
> specify parameters Maybe we need add something like "scan.parametervalues.provider.type", it can be "bound, specify, custom": - when bound, using old partitionLowerBound and partitionUpperBound, numPartitions - when specify, using specify parameters like your proposal - when custom, need "scan.parametervalues.provider.class" > not implement FiltertableTableSource Just because we have no time to finish it. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory Best, Jingsong Lee On Wed, Apr 22, 2020 at 9:49 PM Flavio Pompermaier <pomperma...@okkam.it> wrote: > Ok, now I understand your proposal. However this looks like a workaround > to me..I want to be able to give a name to such a table and register also > to a catalog if I want. > Indeed my proposal is to add a "*connector.read.query*" as an alternative > to "connector.table" (that forces you to map tables as 1-to-1). > Then we can add a *connector.read.parametervalues.provider.class* in > order to customize the splitting of the query (we can also add a check that > the query contains at least 1 question mark). > If we introduce a custom parameters provider we need also to specify > parameters, using something like: > > *'connector.read.parametervalues.0.name > <http://connector.read.parametervalues.0.name>*' = 'minDate', > *'connector.read.parametervalues.0.value'*= '12/10/2019' > *'connector.read.parametervalues.1.name > <http://connector.read.parametervalues.1.name>*' = 'maxDate', > *'connector.read.parametervalues.1.value*'= '01/01/2020' > > Another question: why JDBC table source does not implement > *FilterableTableSource?* > > On Wed, Apr 22, 2020 at 3:27 PM Jingsong Li <jingsongl...@gmail.com> > wrote: > >> Hi, >> >> Requirements: read data from "SELECT public.A.x, public.B.y FROM public.A >> JOIN public.B on public.A.pk <http://public.a.pk/> = public.B.fk >> <http://public.b.fk/>" >> >> Solution: table name = "(SELECT public.A.x, public.B.y FROM public.A JOIN >> public.B on public.A.pk <http://public.a.pk/> = public.B.fk >> <http://public.b.fk/>)" >> >> I don't why there's a 1-to-1 mapping between a Flink table and a JDBC >> table. If it is, there is no way support this requirement because this >> flink table is come from two jdbc tables. >> >> Best, >> Jingsong Lee >> >> On Wed, Apr 22, 2020 at 8:42 PM Flavio Pompermaier <pomperma...@okkam.it> >> wrote: >> >>> Sorry Jingsong but I have to clarify this thing, which is not clear at >>> all to me. >>> >>> From what I can see from the documentation of table API there's no way >>> (currently) to associate an SQL query to a Flink Table, there's a 1-to-1 >>> mapping between a Flink table and a JDBC table. >>> This means that, at the moment, if I want to join 2 tables from the same >>> JDBC source (like in the example) Flink would fetch all the data of the 2 >>> tables and then it will do the join, it will not execute the query directly >>> and get results back. Right? >>> If this is the case we could open an issue in the Blink optimizer that >>> could improve performance if the query that involves a single JDBC source >>> is executed directly to the database. and that's one point. >>> Or maybe this is what you were trying to say with "Which means the >>> "select ..." is dynamically generated by the Flink sql. We can not set it >>> static."? Does it mean that we can't specify a query in a JDBC table? >>> This sounds to go against what you write in the statement before: So >>> this table name can be a rich sql: "(SELECT public.A.x, public.B.y FROM >>> public.A JOIN public.B on public.A.pk <http://public.a.pk/> = >>> public.B.fk <http://public.b.fk/>)" >>> >>> I didn't understand what's your proposals here..I see two issues: >>> >>> 1. If a JDBC table is mapped 1-to-1 with a JDBC table, are queries >>> pushed down in a performant way? >>> 1. i.e. SELECT public.A.x, public.B.y FROM public.A JOIN public.B >>> on public.A.pk <http://public.a.pk/> = public.B.fk >>> <http://public.b.fk/> is performed efficiently to the DB or is it >>> performed in Flink after reading all the tables data? >>> 2. Add a way to handle custom parameter value provider class and >>> query statements. What is exactly your proposal here? >>> >>> >>> On Wed, Apr 22, 2020 at 1:03 PM Jingsong Li <jingsongl...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> You can configure table name for JDBC source. >>>> So this table name can be a rich sql: "(SELECT public.A.x, public.B.y >>>> FROM public.A JOIN public.B on public.A.pk <http://public.a.pk/> = >>>> public.B.fk <http://public.b.fk/>)" >>>> So the final scan query statement will be: "select ... from (SELECT >>>> public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk >>>> <http://public.a.pk/> = public.B.fk <http://public.b.fk/>) where ..." >>>> >>>> Why not use this rich sql to scan query statement? Because we have >>>> implemented the project pushdown [1] in JDBCTableSource. >>>> Which means the "select ..." is dynamically generated by the Flink sql. >>>> We can not set it static. >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-with-projection-push-down >>>> >>>> Best, >>>> Jingsong Lee >>>> >>>> On Wed, Apr 22, 2020 at 6:49 PM Flavio Pompermaier < >>>> pomperma...@okkam.it> wrote: >>>> >>>>> Sorry Jingsong but I didn't understand your reply..Can you better >>>>> explain the following sentences please? Probably I miss some Table API >>>>> background here (I used only JDBOutputFormat). >>>>> "We can not use a simple "scan.query.statement", because in >>>>> JDBCTableSource, it also deal with project pushdown too. Which means that >>>>> the select part can not be modified casual. >>>>> Maybe you can configure a rich table name for this." >>>>> >>>>> I can take care of opening tickets but I need to understand exactly >>>>> how many and I need to be sure of explaining the problem with the correct >>>>> terms. >>>>> >>>>> Best, >>>>> Flavio >>>>> >>>>> On Wed, Apr 22, 2020 at 11:52 AM Jingsong Li <jingsongl...@gmail.com> >>>>> wrote: >>>>> >>>>>> Thanks for the explanation. >>>>>> You can create JIRA for this. >>>>>> >>>>>> For "SELECT public.A.x, public.B.y FROM public.A JOIN public.B on >>>>>> public.A.pk <http://public.a.pk/> = public.B.fk <http://public.b.fk/>. >>>>>> " >>>>>> We can not use a simple "scan.query.statement", because in >>>>>> JDBCTableSource, it also deal with project pushdown too. Which means that >>>>>> the select part can not be modified casual. >>>>>> Maybe you can configure a rich table name for this. >>>>>> >>>>>> Best, >>>>>> Jingsong Lee >>>>>> >>>>>> On Wed, Apr 22, 2020 at 5:24 PM Flavio Pompermaier < >>>>>> pomperma...@okkam.it> wrote: >>>>>> >>>>>>> Because in my use case the parallelism was not based on a range on >>>>>>> keys/numbers but on a range of dates, so I needed a custom Parameter >>>>>>> Provider. >>>>>>> For what regards pushdown I don't know how Flink/Blink currently >>>>>>> works..for example, let's say I have a Postgres catalog containing 2 >>>>>>> tables >>>>>>> (public.A and public.B). >>>>>>> If I do the following query : SELECT public.A.x, public.B.y FROM >>>>>>> public.A JOIN public.B on public.A.pk = public.B.fk. >>>>>>> Will this be pushdown as a single query or will Flink fetch both >>>>>>> tables and the perform the join? >>>>>>> Talking with Bowen I understood that to avoid this I could define a >>>>>>> VIEW in the db (but this is not alway possible) or in Flink (but from >>>>>>> what >>>>>>> I know this is very costly). >>>>>>> In this case a parameter "scan.query.statement" without a >>>>>>> "scan.parameter.values.provider.class" is super helpful and could >>>>>>> improve >>>>>>> performance a lot! >>>>>>> >>>>>>> On Wed, Apr 22, 2020 at 11:06 AM Jingsong Li <jingsongl...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> You are right about the lower and upper, it is a must to >>>>>>>> parallelize the fetch of the data. >>>>>>>> And filter pushdown is used to filter more data at JDBC server. >>>>>>>> >>>>>>>> Yes, we can provide "scan.query.statement" and >>>>>>>> "scan.parameter.values.provider.class" for jdbc connector. But maybe we >>>>>>>> need be careful about this too flexible API. >>>>>>>> >>>>>>>> Can you provide more about your case? Why can not been solved by >>>>>>>> lower and upper with filter pushdown? >>>>>>>> >>>>>>>> Best, >>>>>>>> Jingsong Lee >>>>>>>> >>>>>>>> On Wed, Apr 22, 2020 at 4:45 PM Flavio Pompermaier < >>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>> >>>>>>>>> Maybe I am wrong but support pushdown for JDBC is one thing (that >>>>>>>>> is probably useful) while parameters providers are required if you >>>>>>>>> want to >>>>>>>>> parallelize the fetch of the data. >>>>>>>>> You are not mandated to use NumericBetweenParametersProvider, you >>>>>>>>> can use the ParametersProvider you prefer, depending on the statement >>>>>>>>> you >>>>>>>>> have. >>>>>>>>> Or do you have in mind something else? >>>>>>>>> >>>>>>>>> On Wed, Apr 22, 2020 at 10:33 AM Jingsong Li < >>>>>>>>> jingsongl...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Now in JDBCTableSource.getInputFormat, It's written explicitly: >>>>>>>>>> WHERE XXX BETWEEN ? AND ?. So we must use >>>>>>>>>> `NumericBetweenParametersProvider`. >>>>>>>>>> I don't think this is a good and long-term solution. >>>>>>>>>> I think we should support filter push-down for JDBCTableSource, >>>>>>>>>> so in this way, we can write the filters that we want, what do you >>>>>>>>>> think? >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Jingsong Lee >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Apr 21, 2020 at 10:00 PM Flavio Pompermaier < >>>>>>>>>> pomperma...@okkam.it> wrote: >>>>>>>>>> >>>>>>>>>>> Hi all, >>>>>>>>>>> we have a use case where we have a prepared statement that we >>>>>>>>>>> parameterize using a custom parameters provider (similar to what >>>>>>>>>>> happens in >>>>>>>>>>> testJDBCInputFormatWithParallelismAndNumericColumnSplitting[1]). >>>>>>>>>>> How can we handle this using the JDBC table API? >>>>>>>>>>> What should we do to handle such a use case? Is there anyone >>>>>>>>>>> willing to mentor us in its implementation? >>>>>>>>>>> >>>>>>>>>>> Another question: why flink-jdbc has not been renamed to >>>>>>>>>>> flink-connector-jdbc? >>>>>>>>>>> >>>>>>>>>>> Thanks in advance, >>>>>>>>>>> Flavio >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Best, Jingsong Lee >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best, Jingsong Lee >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Best, Jingsong Lee >>>>>> >>>>> >>>> >>>> -- >>>> Best, Jingsong Lee >>>> >>> >> >> -- >> Best, Jingsong Lee >> > > -- Best, Jingsong Lee