Hi Fabian, I may not have stated it here, and there is no semantic problem at the Flink implementation level. Rather, there may be “Time-dependence” here. [1]
Yes, my initial answer was not to use this form of join in this scenario, but Henry said he converted the table into a stream table and asked about the feasibility of other methods. [1]: 《Designing Data-Intensive Applications》By Martin Kleppmann, Part 3: Derived Data, Chapter 11: Stream Processing , Stream Joins. some content : *If the ordering of events across streams is undetermined, the join becomes nondeter‐ ministic [87], which means you cannot rerun the same job on the same input and necessarily get the same result: the events on the input streams may be interleaved in a different way when you run the job again. * Fabian Hueske <fhue...@gmail.com> 于2018年9月25日周二 下午8:08写道: > Hi, > > I don't think that using the current join implementation in the Table API > / SQL will work. > The non-windowed join fully materializes *both* input tables in state. > This is necessary, because the join needs to be able to process updates on > either side. > While this is not a problem for the fixed sized MySQL table, materializing > the append-only table (aka stream) is probably not what you want. > You can also not limit idle state retention because it would remove the > MySQL table from state at some point. > > The only way to make it work is using a user-defined TableFunction that > queries the MySQL table via JDBC. > However, please note that these calls would be synchronous, blocking calls. > > @Vino: Why do you think that the stream & stream join is not mature and > which problems do you see in the semantics? > The semantics are correct (standard SQL semantics) and in my opinion the > implementation is also mature. > However, you should not use the non-windowed join if any of the input > tables is ever growing because both sides must be hold in state. This is > not an issue of the semantics. > > Cheers, > Fabian > > Am Di., 25. Sep. 2018 um 14:00 Uhr schrieb vino yang < > yanghua1...@gmail.com>: > >> Hi Henry, >> >> 1) I don't recommend this method very much, but you said that you expect >> to convert mysql table to stream and then to flink table. Under this >> premise, I said that you can do this by joining two stream tables. But as >> you know, this join depends on the time period in which the state is saved. >> To make it equivalent to a dimension table, you must permanently save the >> state of the stream table that is defined as a "dimension table." I just >> said that modifying the relevant configuration in Flink can do this, Not >> for a single table. >> >> 2) Imagine that there are one million records in two tables. The records >> in both tables are just beginning to stream into flink, and the records as >> dimension tables are not fully arrived. Therefore, your matching results >> may not be as accurate as directly querying Mysql. >> >> In fact, the current stream & stream join is not very mature, there are >> some problems in semantics, I personally recommend that you return to >> stream/batch (mysql) join. For more principle content, I recommend you read >> a book, referred to as 《DDIA》. >> >> Thanks, vino. >> >> 徐涛 <happydexu...@gmail.com> 于2018年9月25日周二 下午5:48写道: >> >>> Hi Vino, >>> I do not quite understand in some sentences below, would you please help >>> explain it a bit more detailedly? >>> 1. “*such as setting the state retention time of one of the tables to >>> be permanent*” , as I know, the state retention time is a global >>> config, I can not set this property per table. >>> 2. "*you may not be able to match the results, because the data >>> belonging to the mysql table is just beginning to play as a stream*” >>> Why it is not able to match the results? >>> >>> Best >>> Henry >>> >>> 在 2018年9月25日,下午5:29,vino yang <yanghua1...@gmail.com> 写道: >>> >>> Hi Henry, >>> >>> If you have converted the mysql table to a flink stream table. In flink >>> table/sql, streams and stream joins can also do this, such as setting the >>> state retention time of one of the tables to be permanent. But when the job >>> is just running, you may not be able to match the results, because the data >>> belonging to the mysql table is just beginning to play as a stream. >>> >>> Thanks, vino. >>> >>> 徐涛 <happydexu...@gmail.com> 于2018年9月25日周二 下午5:10写道: >>> >>>> Hi Vino & Hequn, >>>> I am now using the table/sql API, if I import the mysql table as a >>>> stream then convert it into a table, it seems that it can also be a >>>> workaround for batch/streaming joining. May I ask what is the difference >>>> between the UDTF method? Does this implementation has some defects? >>>> Best >>>> Henry >>>> >>>> 在 2018年9月22日,上午10:28,Hequn Cheng <chenghe...@gmail.com> 写道: >>>> >>>> Hi >>>> >>>> +1 for vino's answer. >>>> Also, this kind of join will be supported in FLINK-9712 >>>> <https://issues.apache.org/jira/browse/FLINK-9712>. You can check more >>>> details in the jira. >>>> >>>> Best, Hequn >>>> >>>> On Fri, Sep 21, 2018 at 4:51 PM vino yang <yanghua1...@gmail.com> >>>> wrote: >>>> >>>>> Hi Henry, >>>>> >>>>> There are three ways I can think of: >>>>> >>>>> 1) use DataStream API, implement a flatmap UDF to access dimension >>>>> table; >>>>> 2) use table/sql API, implement a UDTF to access dimension table; >>>>> 3) customize the table/sql join API/statement's implementation (and >>>>> change the physical plan) >>>>> >>>>> Thanks, vino. >>>>> >>>>> 徐涛 <happydexu...@gmail.com> 于2018年9月21日周五 下午4:43写道: >>>>> >>>>>> Hi All, >>>>>> Sometimes some “dimension table” need to be joined from the >>>>>> "fact table", if data are not joined before sent to Kafka. >>>>>> So if the data are joined in Flink, does the “dimension >>>>>> table” have to be import as a stream, or there are some other ways can >>>>>> achieve it? >>>>>> Thanks a lot! >>>>>> >>>>>> Best >>>>>> Henry >>>>> >>>>> >>>> >>>