Hi Abdelilah, you are right that union does not work (well) in your case. I misunderstood the relation between the two streams.
The ideal pattern would be a broadcast join imho. [1] I'm not sure how to do it in Table API/SQL though, but I hope Timo can help here as well. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html On Thu, Feb 11, 2021 at 7:00 PM Timo Walther <twal...@apache.org> wrote: > After thinking about this topic again, I think UNION ALL will not solve > the problem because you would need to group by brandId and perform the > joining within the aggregate function which could also be quite expensive. > > Regards, > Timo > > On 11.02.21 17:16, Timo Walther wrote: > > Hi Abdelilah, > > > > at a first glance your logic seems to be correct. But Arvid is right > > that your pipeline might not have the optimal performance that Flink can > > offer due to the 3 groupBy operations. I'm wondering what the optimizer > > produces out of this plan. Maybe you can share it with us using > > `table.explain()` on the final table? > > > > I think what Arvid meant is a UNION ALL in SQL. You would normalize the > > two streams into a CarWithBrand before (containing nulls for the other > > side), and then groupBy/aggregate to the last value and filter out > > invalid CarWithBrands. > > > > If DataStream API is an option for you I would consider using the > > `connect()` method. A connect function can be stateful and you might > > reduce your state size further. In your current implementation, the join > > operator will store all input tables for processing. This means car and > > brand state is stored twice. > > > > Regards, > > Timo > > > > On 11.02.21 16:06, Abdelilah CHOUKRI wrote: > >> Thank you Arvid for the reply, > >> > >> In fact, it's not a union of the same data, I'll try to explain what > >> we want to achieve as a concept: > >> > >> We have 2 data sources, with two different schemas, but with a common > >> field/attribute (example: /brandId/), > >> - /Cars/: receive data entries with high frequency, one /Car/ can only > >> be related to one /Brand/. (with the field /brandId/) > >> - /Brands/: receive data entries with high frequency, one /Brand/ can > >> be related to many /Cars/. (with the field /brandId/) > >> And we need to "merge" these data in a single output: /CarWithBrand/. > >> > >> I'll try to explain the behaviour that we want to achieve with the > >> following diagram: > >> > >> flink_flow.png > >> > >> - Time 1: we have a /Car/ and a /Brand/ matching by /brandId, /so the > >> output should return a corresponding /CarWithRand. > >> / > >> /- /Time 2: we have a new /Car/, also it matched the previous /Brand/, > >> so we output a /CarWithBrand./ > >> - Time 3: we receive a new /Car/, but it does not match any existing > >> /Brand,/ so no output./ > >> / > >> - Time 4: we have a new Car that matches the previous brand, and on > >> the other hand, > >> we received a new Brand that matches the previous Car, > >> so we should have two outputs. > >> - Time 5: we receive an existing brand, but with an updated field (in > >> this case the name), so we have > >> to replace the previous Brand with brandId, and if > >> there are any previous matching Cars, we > >> have to output all the corresponding CarWithBrand with > >> the changed field. > >> > >> So, we're using Flink Tables during the process, to maintain the > >> latest status of the data regardless of time. > >> > >> And furthermore, here's a simplified java code example that represents > >> what we've achieved so far:*flink_join.java* > >> > >> How would you recommend to achieve this with Flink ? > >> Is our approach adequate ? > >> > >> Thank you. > >> > >> On Thu, Feb 11, 2021 at 11:50 AM Arvid Heise <ar...@apache.org > >> <mailto:ar...@apache.org>> wrote: > >> > >> Hi Abdelilah, > >> > >> I think your approach is overly complicated (and probably slow) but > >> I might have misunderstood things. Naively, I'd assume that you just > >> want to union stream 1 and stream 2 instead of joining. Note that > >> for union the events must have the same schema, so you most likely > >> want to have a select on each stream before union. Summarizing: > >> Table3 = (select id, title, description from Table 1) union (select > >> id, title, description from Table 2) > >> > >> If you use a retract stream, you probably do not need to use the > >> grouping and last value selection as well. > >> > >> On Mon, Feb 8, 2021 at 3:33 PM Abdelilah CHOUKRI > >> <abdelilah.chou...@prt.manomano.com > >> <mailto:abdelilah.chou...@prt.manomano.com>> wrote: > >> > >> Hi, > >> > >> We're trying to use Flink 1.11 Java tables API to process a > >> streaming use case: > >> > >> We have 2 streams, each one with different structures. Both > >> events, coming from Kafka, can be: > >> - A new event (not in the system already) > >> - An updated event (updating an event that previously was > >> inserted) > >> so we only want to store the latest data in the Table. > >> > >> We need to join the 2 previous Tables to have all this data > >> stored in the Flink system. We think that the best way is to > >> store joined data as a Table. > >> This is going to be a Flink Table, that will be a join of the 2 > >> tables by a common key. > >> > >> To sum up, we have: > >> - Stream 1 (coming from Kafka topic) -> Flink Table 1 > >> - Stream 2 (coming from Kafka topic) -> Flink Table 2 > >> - Table 3 = Table 1 join Table 2 > >> - DataStream using RetractStream of Table 3 > >> > >> To get the last element in Table 1 and Table 2, we are using > >> Functions (LastValueAggFunction): > >> > >> streamTableEnvironment.registerFunction("LAST_VALUE_STRING", > >> new LastValueAggFunction.StringLastValueAggFunction()); > >> ... > >> streamTableEnvironment.fromDataStream(inputDataStream) > >> .groupBy($("id")) > >> .select( > >> $("id").as("o_id"), > >> call("LAST_VALUE_STRING", $("title")).as("o_title"), > >> call("LAST_VALUE_STRING", $("description")).as("o_description") > >> ); > >> > >> > >> The questions are: > >> - Is our approach correct to get the data stored in the Flink > >> system? > >> - Is it necessary to use the _/LastValueAggFunction /_in our > >> case ? as we want to retract the stream to > >> out custom Pojo instead of _/Row/_, but we're getting the > >> attached error: (attached*: stack_trace.log*) > >> > >> > >> Abdelilah Choukdi, > >> Backend dev at ManoMano. > >> > > > >