> Specify "query" and "provider"
Yes, your proposal looks reasonable to me.
Key can be "scan.***" like in [1].

> specify parameters
Maybe we need add something like "scan.parametervalues.provider.type", it
can be "bound, specify, custom":
- when bound, using old partitionLowerBound
and partitionUpperBound, numPartitions
- when specify, using specify parameters like your proposal
- when custom, need "scan.parametervalues.provider.class"

> not implement FiltertableTableSource
Just because we have no time to finish it.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory

Best,
Jingsong Lee

On Wed, Apr 22, 2020 at 9:49 PM Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> Ok, now I understand your proposal. However this looks like a workaround
> to me..I want to be able to give a name to such a table and register also
> to a catalog if I want.
> Indeed my proposal is to add a "*connector.read.query*" as an alternative
> to "connector.table" (that forces you to map tables as 1-to-1).
> Then we can add a *connector.read.parametervalues.provider.class* in
> order to customize the splitting of the query (we can also add a check that
> the query contains at least 1 question mark).
> If we introduce a custom parameters provider we need also to specify
> parameters, using something like:
>
> *'connector.read.parametervalues.0.name
> <http://connector.read.parametervalues.0.name>*' = 'minDate',
> *'connector.read.parametervalues.0.value'*= '12/10/2019'
> *'connector.read.parametervalues.1.name
> <http://connector.read.parametervalues.1.name>*' = 'maxDate',
> *'connector.read.parametervalues.1.value*'= '01/01/2020'
>
> Another question: why JDBC table source does not implement
> *FilterableTableSource?*
>
> On Wed, Apr 22, 2020 at 3:27 PM Jingsong Li <jingsongl...@gmail.com>
> wrote:
>
>> Hi,
>>
>> Requirements: read data from "SELECT public.A.x, public.B.y FROM public.A
>> JOIN public.B on public.A.pk <http://public.a.pk/> = public.B.fk
>> <http://public.b.fk/>"
>>
>> Solution: table name = "(SELECT public.A.x, public.B.y FROM public.A JOIN
>> public.B on public.A.pk <http://public.a.pk/> = public.B.fk
>> <http://public.b.fk/>)"
>>
>> I don't why there's a 1-to-1 mapping between a Flink table and a JDBC
>> table. If it is, there is no way support this requirement because this
>> flink table is come from two jdbc tables.
>>
>> Best,
>> Jingsong Lee
>>
>> On Wed, Apr 22, 2020 at 8:42 PM Flavio Pompermaier <pomperma...@okkam.it>
>> wrote:
>>
>>> Sorry Jingsong but I have to clarify this thing, which is not clear at
>>> all to me.
>>>
>>> From what I can see from the documentation of table API there's no way
>>> (currently) to associate an SQL query to a Flink Table, there's a 1-to-1
>>> mapping between a Flink table and a JDBC table.
>>> This means that, at the moment, if I want to join 2 tables from the same
>>> JDBC source (like in the example) Flink would fetch all the data of the 2
>>> tables and then it will do the join, it will not execute the query directly
>>> and get results back. Right?
>>> If this is the case we could open an issue in the Blink optimizer that
>>> could improve performance if the query that involves a single JDBC source
>>> is executed directly to the database. and that's one point.
>>> Or maybe this is what you were trying to say with "Which means the
>>> "select ..." is dynamically generated by the Flink sql. We can not set it
>>> static."? Does it mean that we can't specify a query in a JDBC table?
>>> This sounds to go against what you write in the statement before: So
>>> this table name can be a rich sql: "(SELECT public.A.x, public.B.y FROM
>>> public.A JOIN public.B on public.A.pk <http://public.a.pk/> =
>>> public.B.fk <http://public.b.fk/>)"
>>>
>>> I didn't understand what's your proposals here..I see two issues:
>>>
>>>    1. If a JDBC table is mapped 1-to-1 with a JDBC table, are queries
>>>    pushed down in a performant way?
>>>       1. i.e. SELECT public.A.x, public.B.y FROM public.A JOIN public.B
>>>       on public.A.pk <http://public.a.pk/> = public.B.fk
>>>       <http://public.b.fk/> is performed efficiently to the DB or is it
>>>       performed in Flink after reading all the tables data?
>>>       2. Add a way to handle custom parameter value provider class and
>>>    query statements. What is exactly your proposal here?
>>>
>>>
>>> On Wed, Apr 22, 2020 at 1:03 PM Jingsong Li <jingsongl...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> You can configure table name for JDBC source.
>>>> So this table name can be a rich sql: "(SELECT public.A.x, public.B.y
>>>> FROM public.A JOIN public.B on public.A.pk <http://public.a.pk/> =
>>>> public.B.fk <http://public.b.fk/>)"
>>>> So the final scan query statement will be: "select ... from (SELECT
>>>> public.A.x, public.B.y FROM public.A JOIN public.B on public.A.pk
>>>> <http://public.a.pk/> = public.B.fk <http://public.b.fk/>) where ..."
>>>>
>>>> Why not use this rich sql to scan query statement? Because we have
>>>> implemented the project pushdown [1] in JDBCTableSource.
>>>> Which means the "select ..." is dynamically generated by the Flink sql.
>>>> We can not set it static.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#defining-a-tablesource-with-projection-push-down
>>>>
>>>> Best,
>>>> Jingsong Lee
>>>>
>>>> On Wed, Apr 22, 2020 at 6:49 PM Flavio Pompermaier <
>>>> pomperma...@okkam.it> wrote:
>>>>
>>>>> Sorry Jingsong but I didn't understand your reply..Can you better
>>>>> explain the following sentences please? Probably I miss some Table API
>>>>> background here (I used only JDBOutputFormat).
>>>>> "We can not use a simple "scan.query.statement", because in
>>>>> JDBCTableSource, it also deal with project pushdown too. Which means that
>>>>> the select part can not be modified casual.
>>>>> Maybe you can configure a rich table name for this."
>>>>>
>>>>> I can take care of opening tickets but I need to understand exactly
>>>>> how many and I need to be sure of explaining the problem with the correct
>>>>> terms.
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>> On Wed, Apr 22, 2020 at 11:52 AM Jingsong Li <jingsongl...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks for the explanation.
>>>>>> You can create JIRA for this.
>>>>>>
>>>>>> For "SELECT public.A.x, public.B.y FROM public.A JOIN public.B on
>>>>>> public.A.pk <http://public.a.pk/> = public.B.fk <http://public.b.fk/>.
>>>>>> "
>>>>>> We can not use a simple "scan.query.statement", because in
>>>>>> JDBCTableSource, it also deal with project pushdown too. Which means that
>>>>>> the select part can not be modified casual.
>>>>>> Maybe you can configure a rich table name for this.
>>>>>>
>>>>>> Best,
>>>>>> Jingsong Lee
>>>>>>
>>>>>> On Wed, Apr 22, 2020 at 5:24 PM Flavio Pompermaier <
>>>>>> pomperma...@okkam.it> wrote:
>>>>>>
>>>>>>> Because in my use case the parallelism was not based on a range on
>>>>>>> keys/numbers but on a range of dates, so I needed a custom Parameter
>>>>>>> Provider.
>>>>>>> For what regards pushdown I don't know how Flink/Blink currently
>>>>>>> works..for example, let's say I have a Postgres catalog containing 2 
>>>>>>> tables
>>>>>>> (public.A and public.B).
>>>>>>> If I do the following query : SELECT public.A.x, public.B.y FROM
>>>>>>> public.A JOIN public.B on public.A.pk = public.B.fk.
>>>>>>> Will this be pushdown as a single query or will Flink fetch both
>>>>>>> tables and the perform the join?
>>>>>>> Talking with Bowen I understood that to avoid this I could define a
>>>>>>> VIEW in the db (but this is not alway possible) or in Flink (but from 
>>>>>>> what
>>>>>>> I know this is very costly).
>>>>>>> In this case a parameter "scan.query.statement" without a
>>>>>>> "scan.parameter.values.provider.class" is super helpful and could 
>>>>>>> improve
>>>>>>> performance a lot!
>>>>>>>
>>>>>>> On Wed, Apr 22, 2020 at 11:06 AM Jingsong Li <jingsongl...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> You are right about the lower and upper, it is a must to
>>>>>>>> parallelize the fetch of the data.
>>>>>>>> And filter pushdown is used to filter more data at JDBC server.
>>>>>>>>
>>>>>>>> Yes, we can provide "scan.query.statement" and
>>>>>>>> "scan.parameter.values.provider.class" for jdbc connector. But maybe we
>>>>>>>> need be careful about this too flexible API.
>>>>>>>>
>>>>>>>> Can you provide more about your case? Why can not been solved by
>>>>>>>> lower and upper with filter pushdown?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jingsong Lee
>>>>>>>>
>>>>>>>> On Wed, Apr 22, 2020 at 4:45 PM Flavio Pompermaier <
>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>
>>>>>>>>> Maybe I am wrong but support pushdown for JDBC is one thing (that
>>>>>>>>> is probably useful) while parameters providers are required if you 
>>>>>>>>> want to
>>>>>>>>> parallelize the fetch of the data.
>>>>>>>>> You are not mandated to use NumericBetweenParametersProvider, you
>>>>>>>>> can use the ParametersProvider you prefer, depending on the statement 
>>>>>>>>> you
>>>>>>>>> have.
>>>>>>>>> Or do you have in mind something else?
>>>>>>>>>
>>>>>>>>> On Wed, Apr 22, 2020 at 10:33 AM Jingsong Li <
>>>>>>>>> jingsongl...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> Now in JDBCTableSource.getInputFormat, It's written explicitly:
>>>>>>>>>> WHERE XXX BETWEEN ? AND ?. So we must use
>>>>>>>>>> `NumericBetweenParametersProvider`.
>>>>>>>>>> I don't think this is a good and long-term solution.
>>>>>>>>>> I think we should support filter push-down for JDBCTableSource,
>>>>>>>>>> so in this way, we can write the filters that we want, what do you 
>>>>>>>>>> think?
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Jingsong Lee
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Apr 21, 2020 at 10:00 PM Flavio Pompermaier <
>>>>>>>>>> pomperma...@okkam.it> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi all,
>>>>>>>>>>> we have a use case where we have a prepared statement that we
>>>>>>>>>>> parameterize using a custom parameters provider (similar to what 
>>>>>>>>>>> happens in
>>>>>>>>>>> testJDBCInputFormatWithParallelismAndNumericColumnSplitting[1]).
>>>>>>>>>>> How can we handle this using the JDBC table API?
>>>>>>>>>>> What should we do to handle such a use case? Is there anyone
>>>>>>>>>>> willing to mentor us in its implementation?
>>>>>>>>>>>
>>>>>>>>>>> Another question: why flink-jdbc has not been renamed to
>>>>>>>>>>> flink-connector-jdbc?
>>>>>>>>>>>
>>>>>>>>>>> Thanks in advance,
>>>>>>>>>>> Flavio
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best, Jingsong Lee
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best, Jingsong Lee
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best, Jingsong Lee
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>

-- 
Best, Jingsong Lee

Reply via email to