Good to hear that the patch resolved your issue, looking forward to hearing more feedback from you!
Best, Kurt On Mon, Jan 6, 2020 at 5:56 AM Benoît Paris < benoit.pa...@centraliens-lille.org> wrote: > Hi Kurt, > > Thank you for your answer. > > Yes both fact tables and dimension tables are changing over time; it was > to illustrate that they could change at the same time but that we could > still make a JOIN basically ignore updates from one specified side. The SQL > is not the actual one I'm using, and as you have said later on, I indeed > don't deal with a time attribute and just want what's in the table at > processing time. > > At the moment my problem seems to be in good way of being resolved, and it > is going to be Option 4: "LATERAL TABLE table_function" on the Blink > planner; as Jark Wu seems to be -elegantly- providing a patch for the > FLINK-14200 NPE bug: > https://github.com/apache/flink/pull/10763 > It was indeed about shenanigans on finding the proper RelOptSchema; Ah, > I wish I had dived sooner in the source code, and I could have had the > pleasure opportunity to contribute to the Flink codebase. > > Anyway, shout out to Jark for resolving the bug and providing a patch! I > believe this will be a real enabler for CQRS architectures on Flink (we had > subscriptions with regular joins, and this patch enables querying the same > thing with very minor SQL modifications) > > Kind regards > Benoît > > > On Sat, Jan 4, 2020 at 4:22 AM Kurt Young <ykt...@gmail.com> wrote: > >> Hi Benoît, >> >> Before discussing all the options you listed, I'd like understand more >> about your requirements. >> >> The part I don't fully understand is, both your fact (Event) and >> dimension (DimensionAtJoinTimeX) tables are >> coming from the same table, Event or EventRawInput in your case. So it >> will result that both your fact and >> dimension tables are changing with time. >> >> My understanding is, when your DimensionAtJoinTimeX table emit the >> results, you don't want to change the >> result again. You want the fact table only join whatever data currently >> the dimension table have? I'm asking >> because your dimension table was calculated with a window aggregation, >> but your join logic seems doesn't >> care about the time attribute (LEFT JOIN DimensionAtJoinTime1 d1 ON e.uid >> = d1.uid). It's possible that >> when a record with uid=x comes from Event table, but the dimension table >> doesn't have any data around >> uid=x yet due to the window aggregation. In this case, you don't want >> them to join? >> >> Best, >> Kurt >> >> >> On Fri, Jan 3, 2020 at 1:11 AM Benoît Paris < >> benoit.pa...@centraliens-lille.org> wrote: >> >>> Hello all! >>> >>> I'm trying to design a stream pipeline, and have trouble controlling >>> when a JOIN is triggering an update: >>> >>> Setup: >>> >>> - The Event table; "probe side", "query side", the result of earlier >>> stream processing >>> - The DimensionAtJoinTimeX tables; of updating nature, "build side", >>> the results of earlier stream processing >>> >>> Joining them: >>> >>> SELECT * >>> FROM Event e >>> LEFT JOIN DimensionAtJoinTime1 d1 >>> ON e.uid = d1.uid >>> LEFT JOIN DimensionAtJoinTime2 d2 >>> ON e.uid = d2.uid >>> >>> The DimensionAtJoinTimeX Tables being the result of earlier stream >>> processing, possibly from the same Event table: >>> >>> SELECT uid, >>> hop_start(...), >>> sum(...) >>> FROM Event e >>> GROUP BY uid, >>> hop(...) >>> >>> The Event Table being: >>> >>> SELECT ... >>> FROM EventRawInput i >>> WHERE i.some_field = 'some_value' >>> >>> Requirements: >>> >>> - I need the JOINs to only be executed once, only when a new line is >>> appended to the probe / query / Event table. >>> - I also need the full pipeline to be defined in SQL. >>> - I very strongly prefer the Blink planner (mainly for >>> Deduplication, TopN and LAST_VALUE features). >>> >>> Problem exploration so far: >>> >>> - Option 1, "FOR SYSTEM_TIME AS OF" [1]: I need to have the solution >>> in SQL: it doesn't work out. But I might explore the following: insert >>> DimensionAtJoinTimeX into a special Sink, and use it in a >>> LookupableTableSource (I'm at a loss on how to do that, though. Do I need >>> an external kv store?). >>> - Option 2, "FOR SYSTEM_TIME AS OF" [1], used in SQL: Is there a >>> version of "FOR SYSTEM_TIME AS OF" readily usable in SQL? I might have >>> missed something in the documentation. >>> - Option 3, "LATERAL TABLE table_function" [2], on the Legacy >>> planner: It does not work with two tables [3], and I don't get to have >>> the >>> Blink planner features. >>> - Option 4, "LATERAL TABLE table_function" [2], on the Blink >>> planner: It does not work with the "probe side" being the results of >>> earlier stream processing [4]. >>> - Option 5, let a regular JOIN materialize the updates, and somehow >>> find how to filter the ones coming from the build sides (I'm at a loss on >>> how to do that). >>> - Option 6, "TVR": I read this paper [5], which mentions >>> "Time-Varying Relation"s; Speculating here: could there be a way, to say >>> that the build side is not a TVR. Aka declare the stream as being somehow >>> "static", while still being updated (but I guess we're back to "FOR >>> SYSTEM_TIME AS OF"). >>> - Option 7: Is there some features being developed, or hints, or >>> workarounds to control the JOIN updates that I have not considered so >>> far? >>> - Remark 1: I believe that FLINK-15112 and FLINK-14200 are of the >>> same bug nature, even though they occur in different situations on >>> different planners (same Exception Stack Trace on files that have the >>> same >>> historical parent before the Blink fork). FLINK-15112 has a workaround, >>> but >>> FLINK-14200 does not. The existence of that workaround IMHO signals that >>> there is a simple fix for both bugs. I have tried to find it in Flink >>> for a >>> few days, but no success so far. If you guys have pointers helping me >>> provide a fix, I'll gladly listen. So far I have progressed to: It >>> revolves >>> around Calcite-based Flink streaming rules transforming a temporal table >>> function correlate into a Join on 2*Scan, and crashes when it encounters >>> something that is not a table that can be readily scanned. Also, there >>> are >>> shenanigans on trying to find the right schema in the Catalog. But I am >>> blocked now, and not accustomed to the Flink internal code (would like to >>> though, if Alibaba/Ververica are recruiting remote workers, wink wink, >>> nudge nudge). >>> >>> All opinions very much welcomed on all Options and Remarks! >>> >>> Cheers, and a happy new year to all, >>> Benoît >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table >>> >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/joins.html#processing-time-temporal-joins >>> >>> [3] https://issues.apache.org/jira/browse/FLINK-15112 >>> >>> [4] https://issues.apache.org/jira/browse/FLINK-14200 >>> >>> [5] https://arxiv.org/pdf/1905.12133.pdf >>> >> > > -- > Benoît Paris > Ingénieur Machine Learning Explicable > Tél : +33 6 60 74 23 00 > http://benoit.paris > http://explicable.ml >