Hi Fabian,

I may not have stated it here, and there is no semantic problem at the
Flink implementation level. Rather, there may be “Time-dependence” here. [1]

Yes, my initial answer was not to use this form of join in this scenario,
but Henry said he converted the table into a stream table and asked about
the feasibility of other methods.

[1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part 3:
Derived Data, Chapter 11: Stream Processing , Stream Joins.

some content :

*If the ordering of events across streams is undetermined, the join becomes
nondeter‐ ministic [87], which means you cannot rerun the same job on the
same input and necessarily get the same result: the events on the input
streams may be interleaved in a different way when you run the job again. *


Fabian Hueske <fhue...@gmail.com> 于2018年9月25日周二 下午8:08写道:

> Hi,
>
> I don't think that using the current join implementation in the Table API
> / SQL will work.
> The non-windowed join fully materializes *both* input tables in state.
> This is necessary, because the join needs to be able to process updates on
> either side.
> While this is not a problem for the fixed sized MySQL table, materializing
> the append-only table (aka stream) is probably not what you want.
> You can also not limit idle state retention because it would remove the
> MySQL table from state at some point.
>
> The only way to make it work is using a user-defined TableFunction that
> queries the MySQL table via JDBC.
> However, please note that these calls would be synchronous, blocking calls.
>
> @Vino: Why do you think that the stream & stream join is not mature and
> which problems do you see in the semantics?
> The semantics are correct (standard SQL semantics) and in my opinion the
> implementation is also mature.
> However, you should not use the non-windowed join if any of the input
> tables is ever growing because both sides must be hold in state. This is
> not an issue of the semantics.
>
> Cheers,
> Fabian
>
> Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang <
> yanghua1...@gmail.com>:
>
>> Hi Henry,
>>
>> 1) I don't recommend this method very much, but you said that you expect
>> to convert mysql table to stream and then to flink table. Under this
>> premise, I said that you can do this by joining two stream tables. But as
>> you know, this join depends on the time period in which the state is saved.
>> To make it equivalent to a dimension table, you must permanently save the
>> state of the stream table that is defined as a "dimension table." I just
>> said that modifying the relevant configuration in Flink can do this, Not
>> for a single table.
>>
>> 2) Imagine that there are one million records in two tables. The records
>> in both tables are just beginning to stream into flink, and the records as
>> dimension tables are not fully arrived. Therefore, your matching results
>> may not be as accurate as directly querying Mysql.
>>
>> In fact, the current stream & stream join is not very mature, there are
>> some problems in semantics, I personally recommend that you return to
>> stream/batch (mysql) join. For more principle content, I recommend you read
>> a book, referred to as 《DDIA》.
>>
>> Thanks, vino.
>>
>> 徐涛 <happydexu...@gmail.com> 于2018年9月25日周二 下午5:48写道:
>>
>>> Hi Vino,
>>> I do not quite understand in some sentences below, would you please help
>>> explain it a bit more detailedly?
>>> 1. “*such as setting the state retention time of one of the tables to
>>> be permanent*” , as I know, the state retention time is a global
>>> config, I can not set this property per table.
>>> 2. "*you may not be able to match the results, because the data
>>> belonging to the mysql table is just beginning to play as a stream*”
>>>  Why it is not able to match the results?
>>>
>>> Best
>>> Henry
>>>
>>> 在 2018年9月25日,下午5:29,vino yang <yanghua1...@gmail.com> 写道:
>>>
>>> Hi Henry,
>>>
>>> If you have converted the mysql table to a flink stream table. In flink
>>> table/sql, streams and stream joins can also do this, such as setting the
>>> state retention time of one of the tables to be permanent. But when the job
>>> is just running, you may not be able to match the results, because the data
>>> belonging to the mysql table is just beginning to play as a stream.
>>>
>>> Thanks, vino.
>>>
>>> 徐涛 <happydexu...@gmail.com> 于2018年9月25日周二 下午5:10写道:
>>>
>>>> Hi Vino & Hequn,
>>>> I am now using the table/sql API, if I import the mysql table as a
>>>> stream then convert it into a table, it seems that it can also be a
>>>> workaround for batch/streaming joining. May I ask what is the difference
>>>> between the UDTF method? Does this implementation has some defects?
>>>> Best
>>>> Henry
>>>>
>>>> 在 2018年9月22日,上午10:28,Hequn Cheng <chenghe...@gmail.com> 写道:
>>>>
>>>> Hi
>>>>
>>>> +1 for vino's answer.
>>>> Also, this kind of join will be supported in FLINK-9712
>>>> <https://issues.apache.org/jira/browse/FLINK-9712>. You can check more
>>>> details in the jira.
>>>>
>>>> Best, Hequn
>>>>
>>>> On Fri, Sep 21, 2018 at 4:51 PM vino yang <yanghua1...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Henry,
>>>>>
>>>>> There are three ways I can think of:
>>>>>
>>>>> 1) use DataStream API, implement a flatmap UDF to access dimension
>>>>> table;
>>>>> 2) use table/sql API, implement a UDTF to access dimension table;
>>>>> 3) customize the table/sql join API/statement's implementation (and
>>>>> change the physical plan)
>>>>>
>>>>> Thanks, vino.
>>>>>
>>>>> 徐涛 <happydexu...@gmail.com> 于2018年9月21日周五 下午4:43写道:
>>>>>
>>>>>> Hi All,
>>>>>>         Sometimes some “dimension table” need to be joined from the
>>>>>> "fact table", if data are not joined before sent to Kafka.
>>>>>>         So if the data are joined in Flink, does the “dimension
>>>>>> table” have to be import as a stream, or there are some other ways can
>>>>>> achieve it?
>>>>>>         Thanks a lot!
>>>>>>
>>>>>> Best
>>>>>> Henry
>>>>>
>>>>>
>>>>
>>>

Reply via email to