Hi Henry,

1. Also take a look at the regular joins limitations 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#regular-joins>:

> However, this operation has an important implication: it requires to keep 
> both sides of the join input in Flink’s state forever. Thus, the resource 
> usage will grow
> indefinitely as well, if one or both input tables are continuously growing.

4. Our current grammar for temporal table joins is like a stop gap solution 
that is ANSI SQL complainant. Unfortunately SQL standard lags behind the 
streaming requirements and we are working on addressing this issue. [1]

5. It will be execute in a similar fashion how you would expect regular hash 
join to be executed - the “WHERE” join condition will be pushed into the 
temporal table join operator.

6. I don’t think that Flink supports the syntax suggested by Hequn. Currently 
outer joins are not supported with temporal tables.

Piotrek

[1] https://issues.apache.org/jira/browse/CALCITE-1917 
<https://issues.apache.org/jira/browse/CALCITE-1917>


> On 14 Mar 2019, at 03:31, Hequn Cheng <chenghe...@gmail.com> wrote:
> 
> Hi Henry,
> 
> These are good questions! 
> I would rather not to add the temporal and lateral prefix in front of the 
> join. The temporal table is a concept orthogonal to join. We should say join 
> a temporal table or join a Lateral table. 
> 1. You can of course use stream-stream join. Introducing the temporal table 
> not only makes our query more simple but also improves performance. More 
> detail can be found in [1].
> 2. Both two joins based on the concept of temporal table, i.e., a table joins 
> a temporal table.
> 3. Yes, actually the join in Flink uses a lateral table&TemporalTableFunction 
> to implement a temporal table. A temporal table is a versioned table and a 
> lateral table is a table keeps references to the previous table. If you do 
> not want to use time version, you don't need the temporal table. 
> 4. It is a kind of join. The join keyword can be omitted if it is an inner 
> join. The grammar will not be changed in the near future. I haven't heard 
> some news about changing it.
> 5. Yes, it will be optimized. 
> 6. If you want to left join a temporal table. You can write sql like:
> 
> SELECT
>   o.amout, o.currency, r.rate, o.amount * r.rate
> FROM
>   Orders AS o
>   LEFT JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
>   ON r.currency = o.currency
> 
> CC @Piotr Nowojski <mailto:pi...@data-artisans.com>  Would be great to have 
> your opinions here.
> 
> Best,
> Hequn
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table>
> 
> 
> On Wed, Mar 13, 2019 at 1:59 PM 徐涛 <happydexu...@gmail.com 
> <mailto:happydexu...@gmail.com>> wrote:
> Hi Hequn,
>       Thanks a lot for your answer! That is very helpful for me.
>       I still have some questions about stream and dimension data join and 
> temporal table join:  
>       1. I found the temporal table join is still a one stream driven join, I 
> do not know why the dimension data join has to be done by one stream driven 
> join, why it can not be done by two stream join(traditional stream-stream 
> join)?
>               I try to give an answer about it: two stream join is based on 
> the mechanism that is materialize two stream data in state, but the due to 
> state retention, the dimension data may be lost. I guess this is one reason, 
> am I correct?
>       2. Is Blink`s stream and dimension data join based on temporal table 
> join? 
>         3. I think lateral table join can also do dimension join if I do not 
> want to use time versioning. How to choose between temporal table join and 
> lateral table join?
>       4. I found that the temporal table join in Flink use a “LATERAL TABLE” 
> grammar, but not “JOIN”, it is OK but not easier to use than “JOIN”, will the 
> community modify the grammar in future releases?
>       5. In the following temporal table join statement, will the Orders 
> table join Rates produce too many data before the where clause take effects? 
> Will it be optimized?
> SELECT
>   o.amount * r.rate AS amount
> FROM
>   Orders AS o,
>   LATERAL TABLE (Rates(o.rowtime)) AS r
> WHERE r.currency = o.currency 
>       6. How to use temporal table join to do left join?
> 
> 
> Best
> Henry
> 
>> 在 2019年3月13日,上午12:02,Hequn Cheng <chenghe...@gmail.com 
>> <mailto:chenghe...@gmail.com>> 写道:
>> 
>> Hi Henry,
>> 
>> Yes, you are correct. Basically, there are two ways you can use to join a 
>> Temporal Table. One is provided in Flink and the other is provided in Blink 
>> which has been pushed as a branch[1] in Flink repo.
>> 
>> - Join a Temporal Table in Flink[2][3][4]
>> As the document said: it is a join with a temporal table joins an 
>> append-only table (left input/probe side) with a temporal table (right 
>> input/build side), i.e., a table that changes over time and tracks its 
>> changes. You need to define a temporal table function and it will be used to 
>> provide access to the state of a temporal table at a specific point in time. 
>> *Both rowtime and proctime are supported.* 
>> - Join a Temporal Table in Blink[5]
>> Different from the join in Flink, it can join an *append/upsert/retract* 
>> stream (left input/probe side) with a temporal table (right input/build 
>> side), i.e., a *remote dimension table* that changes over time. In order to 
>> access data in a temporal table, you need to define a TableSource with 
>> LookupableTableSource[6](Probably you can download the code of blink and 
>> take a look at the `HBase143TableSource` which is an implementation of 
>> LookupableTableSource). Currently, only proctime is supported.
>> 
>> I think you can choose one according to your scenarios.
>> There are some useful examples in the document I list below. They may be 
>> very helpful for you. Feel free to ask if you have any other questions.
>> 
>> Best,
>> Hequn
>> 
>> [1] https://github.com/apache/flink/tree/blink 
>> <https://github.com/apache/flink/tree/blink>
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/joins.html#join-with-a-temporal-table>
>>  
>> [3] 
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html>
>> [4] 
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-master/dev/table/tableApi.html#joins>
>> [5] 
>> https://flink-china.org/doc/blink/dev/table/streaming/joins.html#join-with-a-temporal-table
>>  
>> <https://flink-china.org/doc/blink/dev/table/streaming/joins.html#join-with-a-temporal-table>
>> [6] 
>> https://flink-china.org/doc/blink/dev/table/sourcesinks#defining-a-tablesource-with-lookupable
>>  
>> <https://flink-china.org/doc/blink/dev/table/sourcesinks#defining-a-tablesource-with-lookupable>
>> On Tue, Mar 12, 2019 at 2:13 PM 徐涛 <happydexu...@gmail.com 
>> <mailto:happydexu...@gmail.com>> wrote:
>> Hi Hequn,
>>      I want to implement stream join dimension in Flink SQL, I found there 
>> is a new feature named Temporal Tables delivered by Flink1.7, I think it 
>> maybe could be used to achieve the join between stream and dimension table. 
>> But I am not sure about that. Could anyone help me about it? 
>>      Thanks a lot for your help.
>> 
>> Best 
>> Henry
>> 
>>> 在 2018年9月26日,上午12:16,Hequn Cheng <chenghe...@gmail.com 
>>> <mailto:chenghe...@gmail.com>> 写道:
>>> 
>>> Hi vino,
>>> 
>>> Thanks for sharing the link. It's a great book and I will take a look. 
>>> There are kinds of join. Different joins have different semantics. From the 
>>> link, I think it means the time versioned join.  FLINK-9712 
>>> <https://issues.apache.org/jira/browse/FLINK-9712> enrichments joins with 
>>> Time Versioned Functions and the result is deterministic under eventime.  
>>> 
>>> Best, Hequn
>>> 
>>> On Tue, Sep 25, 2018 at 11:05 PM vino yang <yanghua1...@gmail.com 
>>> <mailto:yanghua1...@gmail.com>> wrote:
>>> Hi Hequn,
>>> 
>>> The specific content of the book does not give a right or wrong conclusion, 
>>> but it illustrates this phenomenon: two streams of the same input, playing 
>>> and joining at the same time, due to the order of events, the connection 
>>> results are uncertain. This is because the two streams are intertwined in 
>>> different forms. This has nothing to do with orderby, just that it exists 
>>> in the stream stream join. Of course, this phenomenon is only a comparison 
>>> statement with a non-stream join.
>>> 
>>> In addition, I recommend this book, which is very famous on Twitter and 
>>> Amazon. Because you are also Chinese, there is a good translation here. If 
>>> I guess it is correct, the main translator is also from your company. This 
>>> part of what I mentioned is here.[1]
>>> 
>>> [1]: 
>>> https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7
>>>  
>>> <https://github.com/Vonng/ddia/blob/master/ch11.md#%E8%BF%9E%E6%8E%A5%E7%9A%84%E6%97%B6%E9%97%B4%E4%BE%9D%E8%B5%96%E6%80%A7>
>>> 
>>> Thanks, vino.
>>> 
>>> Hequn Cheng <chenghe...@gmail.com <mailto:chenghe...@gmail.com>> 
>>> 于2018年9月25日周二 下午9:45写道:
>>> Hi vino,
>>> 
>>> There are no order problems of stream-stream join in Flink. No matter what 
>>> order the elements come, stream-stream join in Flink will output results 
>>> which consistent with standard SQL semantics. I haven't read the book you 
>>> mentioned. For join, it doesn't guarantee output orders. You have to do 
>>> orderBy if you want to get ordered results.
>>> 
>>> Best, Hequn
>>> 
>>> On Tue, Sep 25, 2018 at 8:36 PM vino yang <yanghua1...@gmail.com 
>>> <mailto:yanghua1...@gmail.com>> wrote:
>>> Hi Fabian,
>>> 
>>> I may not have stated it here, and there is no semantic problem at the 
>>> Flink implementation level. Rather, there may be “Time-dependence” here. [1]
>>> 
>>> Yes, my initial answer was not to use this form of join in this scenario, 
>>> but Henry said he converted the table into a stream table and asked about 
>>> the feasibility of other methods.
>>> 
>>> [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part 3: 
>>> Derived Data, Chapter 11: Stream Processing , Stream Joins.
>>> 
>>> some content :
>>> If the ordering of events across streams is undetermined, the join becomes 
>>> nondeter‐ ministic [87], which means you cannot rerun the same job on the 
>>> same input and necessarily get the same result: the events on the input 
>>> streams may be interleaved in a different way when you run the job again. 
>>> 
>>> 
>>> 
>>> Fabian Hueske <fhue...@gmail.com <mailto:fhue...@gmail.com>> 于2018年9月25日周二 
>>> 下午8:08写道:
>>> Hi,
>>> 
>>> I don't think that using the current join implementation in the Table API / 
>>> SQL will work.
>>> The non-windowed join fully materializes *both* input tables in state. This 
>>> is necessary, because the join needs to be able to process updates on 
>>> either side.
>>> While this is not a problem for the fixed sized MySQL table, materializing 
>>> the append-only table (aka stream) is probably not what you want.
>>> You can also not limit idle state retention because it would remove the 
>>> MySQL table from state at some point.
>>> 
>>> The only way to make it work is using a user-defined TableFunction that 
>>> queries the MySQL table via JDBC. 
>>> However, please note that these calls would be synchronous, blocking calls.
>>> 
>>> @Vino: Why do you think that the stream & stream join is not mature and 
>>> which problems do you see in the semantics? 
>>> The semantics are correct (standard SQL semantics) and in my opinion the 
>>> implementation is also mature.
>>> However, you should not use the non-windowed join if any of the input 
>>> tables is ever growing because both sides must be hold in state. This is 
>>> not an issue of the semantics.
>>> 
>>> Cheers,
>>> Fabian
>>> 
>>> Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang <yanghua1...@gmail.com 
>>> <mailto:yanghua1...@gmail.com>>:
>>> Hi Henry,
>>> 
>>> 1) I don't recommend this method very much, but you said that you expect to 
>>> convert mysql table to stream and then to flink table. Under this premise, 
>>> I said that you can do this by joining two stream tables. But as you know, 
>>> this join depends on the time period in which the state is saved. To make 
>>> it equivalent to a dimension table, you must permanently save the state of 
>>> the stream table that is defined as a "dimension table." I just said that 
>>> modifying the relevant configuration in Flink can do this, Not for a single 
>>> table.
>>> 
>>> 2) Imagine that there are one million records in two tables. The records in 
>>> both tables are just beginning to stream into flink, and the records as 
>>> dimension tables are not fully arrived. Therefore, your matching results 
>>> may not be as accurate as directly querying Mysql.
>>> 
>>> In fact, the current stream & stream join is not very mature, there are 
>>> some problems in semantics, I personally recommend that you return to 
>>> stream/batch (mysql) join. For more principle content, I recommend you read 
>>> a book, referred to as 《DDIA》.
>>> 
>>> Thanks, vino.
>>> 
>>> 徐涛 <happydexu...@gmail.com <mailto:happydexu...@gmail.com>> 于2018年9月25日周二 
>>> 下午5:48写道:
>>> Hi Vino,
>>>     I do not quite understand in some sentences below, would you please 
>>> help explain it a bit more detailedly?
>>>     1. “such as setting the state retention time of one of the tables to be 
>>> permanent” , as I know, the state retention time is a global config, I can 
>>> not set this property per table.
>>>     2. "you may not be able to match the results, because the data 
>>> belonging to the mysql table is just beginning to play as a stream”  Why it 
>>> is not able to match the results?
>>> 
>>> Best
>>> Henry
>>> 
>>>> 在 2018年9月25日,下午5:29,vino yang <yanghua1...@gmail.com 
>>>> <mailto:yanghua1...@gmail.com>> 写道:
>>>> 
>>>> Hi Henry,
>>>> 
>>>> If you have converted the mysql table to a flink stream table. In flink 
>>>> table/sql, streams and stream joins can also do this, such as setting the 
>>>> state retention time of one of the tables to be permanent. But when the 
>>>> job is just running, you may not be able to match the results, because the 
>>>> data belonging to the mysql table is just beginning to play as a stream.
>>>> 
>>>> Thanks, vino.
>>>> 
>>>> 徐涛 <happydexu...@gmail.com <mailto:happydexu...@gmail.com>> 于2018年9月25日周二 
>>>> 下午5:10写道:
>>>> Hi Vino & Hequn,
>>>>    I am now using the table/sql API, if I import the mysql table as a 
>>>> stream then convert it into a table, it seems that it can also be a 
>>>> workaround for batch/streaming joining. May I ask what is the difference 
>>>> between the UDTF method? Does this implementation has some defects?
>>>>    
>>>> Best
>>>> Henry
>>>> 
>>>>> 在 2018年9月22日,上午10:28,Hequn Cheng <chenghe...@gmail.com 
>>>>> <mailto:chenghe...@gmail.com>> 写道:
>>>>> 
>>>>> Hi
>>>>> 
>>>>> +1 for vino's answer. 
>>>>> Also, this kind of join will be supported in FLINK-9712 
>>>>> <https://issues.apache.org/jira/browse/FLINK-9712>. You can check more 
>>>>> details in the jira.
>>>>> 
>>>>> Best, Hequn
>>>>> 
>>>>> On Fri, Sep 21, 2018 at 4:51 PM vino yang <yanghua1...@gmail.com 
>>>>> <mailto:yanghua1...@gmail.com>> wrote:
>>>>> Hi Henry,
>>>>> 
>>>>> There are three ways I can think of:
>>>>> 
>>>>> 1) use DataStream API, implement a flatmap UDF to access dimension table;
>>>>> 2) use table/sql API, implement a UDTF to access dimension table;
>>>>> 3) customize the table/sql join API/statement's implementation (and 
>>>>> change the physical plan)
>>>>> 
>>>>> Thanks, vino.
>>>>> 
>>>>> 徐涛 <happydexu...@gmail.com <mailto:happydexu...@gmail.com>> 于2018年9月21日周五 
>>>>> 下午4:43写道:
>>>>> Hi All,
>>>>>         Sometimes some “dimension table” need to be joined from the "fact 
>>>>> table", if data are not joined before sent to Kafka.
>>>>>         So if the data are joined in Flink, does the “dimension table” 
>>>>> have to be import as a stream, or there are some other ways can achieve 
>>>>> it?
>>>>>         Thanks a lot!
>>>>> 
>>>>> Best
>>>>> Henry
>>>> 
>>> 
>> 
> 

Reply via email to