I've created 3 ticket related to this discussion, feel free to comment them:


   1. https://issues.apache.org/jira/browse/FLINK-17358 - JDBCTableSource
   support FiltertableTableSource
   2.

   https://issues.apache.org/jira/browse/FLINK-17360 - Support custom
   partitioners in JDBCReadOptions
   3.

   https://issues.apache.org/jira/browse/FLINK-17361 - Support creating of
   a JDBC table using a custom query

Best,
Flavio

On Wed, Apr 22, 2020 at 4:29 PM Jingsong Li <jingsongl...@gmail.com> wrote:

> > Specify "query" and "provider"
> Yes, your proposal looks reasonable to me.
> Key can be "scan.***" like in [1].
>
> > specify parameters
> Maybe we need add something like "scan.parametervalues.provider.type", it
> can be "bound, specify, custom":
> - when bound, using old partitionLowerBound
> and partitionUpperBound, numPartitions
> - when specify, using specify parameters like your proposal
> - when custom, need "scan.parametervalues.provider.class"
>
> > not implement FiltertableTableSource
> Just because we have no time to finish it.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
>
> Best,
> Jingsong Lee
>
> On Wed, Apr 22, 2020 at 9:49 PM Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
>
>> Ok, now I understand your proposal. However this looks like a workaround
>> to me..I want to be able to give a name to such a table and register also
>> to a catalog if I want.
>> Indeed my proposal is to add a "*connector.read.query*" as an
>> alternative to "connector.table" (that forces you to map tables as 1-to-1).
>> Then we can add a *connector.read.parametervalues.provider.class* in
>> order to customize the splitting of the query (we can also add a check that
>> the query contains at least 1 question mark).
>> If we introduce a custom parameters provider we need also to specify
>> parameters, using something like:
>>
>> *'connector.read.parametervalues.0.name
>> <http://connector.read.parametervalues.0.name>*' = 'minDate',
>> *'connector.read.parametervalues.0.value'*= '12/10/2019'
>> *'connector.read.parametervalues.1.name
>> <http://connector.read.parametervalues.1.name>*' = 'maxDate',
>> *'connector.read.parametervalues.1.value*'= '01/01/2020'
>>
>> Another question: why JDBC table source does not implement
>> *FilterableTableSource?*
>>
>> On Wed, Apr 22, 2020 at 3:27 PM Jingsong Li <jingsongl...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Requirements: read data from "SELECT public.A.x, public.B.y FROM
>>> public.A JOIN public.B on public.A.pk <http://public.a.pk/> =
>>> public.B.fk <http://public.b.fk/>"
>>>
>>> Solution: table name = "(SELECT public.A.x, public.B.y FROM public.A
>>> JOIN public.B on public.A.pk <http://public.a.pk/> = public.B.fk
>>> <http://public.b.fk/>)"
>>>
>>> I don't why there's a 1-to-1 mapping between a Flink table and a JDBC
>>> table. If it is, there is no way support this requirement because this
>>> flink table is come from two jdbc tables.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Wed, Apr 22, 2020 at 8:42 PM Flavio Pompermaier <pomperma...@okkam.it>
>>> wrote:
>>>
>>>> Sorry Jingsong but I have to clarify this thing, which is not clear at
>>>> all to me.
>>>>
>>>> From what I can see from the documentation of table API there's no way
>>>> (currently) to associate an SQL query to a Flink Table, there's a 1-to-1
>>>> mapping between a Flink table and a JDBC table.
>>>> This means that, at the moment, if I want to join 2 tables from the
>>>> same JDBC source (like in the example) Flink would fetch all the data of
>>>> the 2 tables and then it will do the join, it will not execute the query
>>>> directly and get results back. Right?
>>>> If this is the case we could open an issue in the Blink optimizer that
>>>> could improve performance if the query that involves a single JDBC source
>>>> is executed directly to the database. and that's one point.
>>>> Or maybe this is what you were trying to say with "Which means the
>>>> "select ..." is dynamically generated by the Flink sql. We can not set it
>>>> static."? Does it mean that we can't specify a query in a JDBC table?
>>>> This sounds to go against what you write in the statement before: So
>>>> this table name can be a rich sql: "(SELECT public.A.x, public.B.y FROM
>>>> public.A JOIN public.B on public.A.pk <http://public.a.pk/> =
>>>> public.B.fk <http://public.b.fk/>)"
>>>>
>>>> I didn't understand what's your proposals here..I see two issues:
>>>>
>>>>    1. If a JDBC table is mapped 1-to-1 with a JDBC table, are queries
>>>>    pushed down in a performant way?
>>>>       1. i.e. SELECT public.A.x, public.B.y FROM public.A JOIN
>>>>       public.B on public.A.pk <http://public.a.pk/> = public.B.fk
>>>>       <http://public.b.fk/> is performed efficiently to the DB or is
>>>>       it performed in Flink after reading all the tables data?
>>>>       2. Add a way to handle custom parameter value provider class and
>>>>    query statements. What is exactly your proposal here?
>>>>
>>>>
>>>> On Wed, Apr 22, 2020 at 1:03 PM Jingsong Li <jingsongl...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> You can configure table name for JDBC source.
>>>>> So this table name can be a rich sql: "(SELECT public.A.x, public.B.y
>>>>> FROM public.A JOIN public.B on public.A.pk <http://public.a.pk/> =
>>>>> public.B.fk <http://public.b.fk/>)"
>>>>> So the final scan query statement will be: "select ... from (SELECT
>>>>> public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk
>>>>> <http://public.a.pk/> = public.B.fk <http://public.b.fk/>) where ..."
>>>>>
>>>>> Why not use this rich sql to scan query statement? Because we have
>>>>> implemented the project pushdown [1] in JDBCTableSource.
>>>>> Which means the "select ..." is dynamically generated by the Flink
>>>>> sql. We can not set it static.
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-with-projection-push-down
>>>>>
>>>>> Best,
>>>>> Jingsong Lee
>>>>>
>>>>> On Wed, Apr 22, 2020 at 6:49 PM Flavio Pompermaier <
>>>>> pomperma...@okkam.it> wrote:
>>>>>
>>>>>> Sorry Jingsong but I didn't understand your reply..Can you better
>>>>>> explain the following sentences please? Probably I miss some Table API
>>>>>> background here (I used only JDBOutputFormat).
>>>>>> "We can not use a simple "scan.query.statement", because in
>>>>>> JDBCTableSource, it also deal with project pushdown too. Which means that
>>>>>> the select part can not be modified casual.
>>>>>> Maybe you can configure a rich table name for this."
>>>>>>
>>>>>> I can take care of opening tickets but I need to understand exactly
>>>>>> how many and I need to be sure of explaining the problem with the correct
>>>>>> terms.
>>>>>>
>>>>>> Best,
>>>>>> Flavio
>>>>>>
>>>>>> On Wed, Apr 22, 2020 at 11:52 AM Jingsong Li <jingsongl...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for the explanation.
>>>>>>> You can create JIRA for this.
>>>>>>>
>>>>>>> For "SELECT public.A.x, public.B.y FROM public.A JOIN public.B on
>>>>>>> public.A.pk <http://public.a.pk/> = public.B.fk
>>>>>>> <http://public.b.fk/>. "
>>>>>>> We can not use a simple "scan.query.statement", because in
>>>>>>> JDBCTableSource, it also deal with project pushdown too. Which means 
>>>>>>> that
>>>>>>> the select part can not be modified casual.
>>>>>>> Maybe you can configure a rich table name for this.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jingsong Lee
>>>>>>>
>>>>>>> On Wed, Apr 22, 2020 at 5:24 PM Flavio Pompermaier <
>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>
>>>>>>>> Because in my use case the parallelism was not based on a range on
>>>>>>>> keys/numbers but on a range of dates, so I needed a custom Parameter
>>>>>>>> Provider.
>>>>>>>> For what regards pushdown I don't know how Flink/Blink currently
>>>>>>>> works..for example, let's say I have a Postgres catalog containing 2 
>>>>>>>> tables
>>>>>>>> (public.A and public.B).
>>>>>>>> If I do the following query : SELECT public.A.x, public.B.y FROM
>>>>>>>> public.A JOIN public.B on public.A.pk = public.B.fk.
>>>>>>>> Will this be pushdown as a single query or will Flink fetch both
>>>>>>>> tables and the perform the join?
>>>>>>>> Talking with Bowen I understood that to avoid this I could define a
>>>>>>>> VIEW in the db (but this is not alway possible) or in Flink (but from 
>>>>>>>> what
>>>>>>>> I know this is very costly).
>>>>>>>> In this case a parameter "scan.query.statement" without a
>>>>>>>> "scan.parameter.values.provider.class" is super helpful and could 
>>>>>>>> improve
>>>>>>>> performance a lot!
>>>>>>>>
>>>>>>>> On Wed, Apr 22, 2020 at 11:06 AM Jingsong Li <
>>>>>>>> jingsongl...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> You are right about the lower and upper, it is a must to
>>>>>>>>> parallelize the fetch of the data.
>>>>>>>>> And filter pushdown is used to filter more data at JDBC server.
>>>>>>>>>
>>>>>>>>> Yes, we can provide "scan.query.statement" and
>>>>>>>>> "scan.parameter.values.provider.class" for jdbc connector. But maybe 
>>>>>>>>> we
>>>>>>>>> need be careful about this too flexible API.
>>>>>>>>>
>>>>>>>>> Can you provide more about your case? Why can not been solved by
>>>>>>>>> lower and upper with filter pushdown?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Jingsong Lee
>>>>>>>>>
>>>>>>>>> On Wed, Apr 22, 2020 at 4:45 PM Flavio Pompermaier <
>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>
>>>>>>>>>> Maybe I am wrong but support pushdown for JDBC is one thing (that
>>>>>>>>>> is probably useful) while parameters providers are required if you 
>>>>>>>>>> want to
>>>>>>>>>> parallelize the fetch of the data.
>>>>>>>>>> You are not mandated to use NumericBetweenParametersProvider, you
>>>>>>>>>> can use the ParametersProvider you prefer, depending on the 
>>>>>>>>>> statement you
>>>>>>>>>> have.
>>>>>>>>>> Or do you have in mind something else?
>>>>>>>>>>
>>>>>>>>>> On Wed, Apr 22, 2020 at 10:33 AM Jingsong Li <
>>>>>>>>>> jingsongl...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> Now in JDBCTableSource.getInputFormat, It's written explicitly:
>>>>>>>>>>> WHERE XXX BETWEEN ? AND ?. So we must use
>>>>>>>>>>> `NumericBetweenParametersProvider`.
>>>>>>>>>>> I don't think this is a good and long-term solution.
>>>>>>>>>>> I think we should support filter push-down for JDBCTableSource,
>>>>>>>>>>> so in this way, we can write the filters that we want, what do you 
>>>>>>>>>>> think?
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Jingsong Lee
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Apr 21, 2020 at 10:00 PM Flavio Pompermaier <
>>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi all,
>>>>>>>>>>>> we have a use case where we have a prepared statement that we
>>>>>>>>>>>> parameterize using a custom parameters provider (similar to what 
>>>>>>>>>>>> happens in
>>>>>>>>>>>> testJDBCInputFormatWithParallelismAndNumericColumnSplitting[1]).
>>>>>>>>>>>> How can we handle this using the JDBC table API?
>>>>>>>>>>>> What should we do to handle such a use case? Is there anyone
>>>>>>>>>>>> willing to mentor us in its implementation?
>>>>>>>>>>>>
>>>>>>>>>>>> Another question: why flink-jdbc has not been renamed to
>>>>>>>>>>>> flink-connector-jdbc?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks in advance,
>>>>>>>>>>>> Flavio
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best, Jingsong Lee
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>>
>
> --
> Best, Jingsong Lee
>

Reply via email to