Hi Abdelilah,

you are right that union does not work (well) in your case. I misunderstood
the relation between the two streams.

The ideal pattern would be a broadcast join imho. [1] I'm not sure how to
do it in Table API/SQL though, but I hope Timo can help here as well.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html

On Thu, Feb 11, 2021 at 7:00 PM Timo Walther <twal...@apache.org> wrote:

> After thinking about this topic again, I think UNION ALL will not solve
> the problem because you would need to group by brandId and perform the
> joining within the aggregate function which could also be quite expensive.
>
> Regards,
> Timo
>
> On 11.02.21 17:16, Timo Walther wrote:
> > Hi Abdelilah,
> >
> > at a first glance your logic seems to be correct. But Arvid is right
> > that your pipeline might not have the optimal performance that Flink can
> > offer due to the 3 groupBy operations. I'm wondering what the optimizer
> > produces out of this plan. Maybe you can share it with us using
> > `table.explain()` on the final table?
> >
> > I think what Arvid meant is a UNION ALL in SQL. You would normalize the
> > two streams into a CarWithBrand before (containing nulls for the other
> > side), and then groupBy/aggregate to the last value and filter out
> > invalid CarWithBrands.
> >
> > If DataStream API is an option for you I would consider using the
> > `connect()` method. A connect function can be stateful and you might
> > reduce your state size further. In your current implementation, the join
> > operator will store all input tables for processing. This means car and
> > brand state is stored twice.
> >
> > Regards,
> > Timo
> >
> > On 11.02.21 16:06, Abdelilah CHOUKRI wrote:
> >> Thank you Arvid for the reply,
> >>
> >> In fact, it's not a union of the same data, I'll try to explain what
> >> we want to achieve as a concept:
> >>
> >> We have 2 data sources, with two different schemas, but with a common
> >> field/attribute (example: /brandId/),
> >> - /Cars/: receive data entries with high frequency, one /Car/ can only
> >> be related to one /Brand/. (with the field /brandId/)
> >> - /Brands/: receive data entries with high frequency, one /Brand/ can
> >> be related to many /Cars/. (with the field /brandId/)
> >> And we need to "merge" these data in a single output: /CarWithBrand/.
> >>
> >> I'll try to explain the behaviour that we want to achieve with the
> >> following diagram:
> >>
> >> flink_flow.png
> >>
> >> - Time 1: we have a /Car/ and a /Brand/ matching by /brandId, /so the
> >> output should return a corresponding /CarWithRand.
> >> /
> >> /- /Time 2: we have a new /Car/, also it matched the previous /Brand/,
> >> so we output a /CarWithBrand./
> >> - Time 3: we receive a new /Car/, but it does not match any existing
> >> /Brand,/ so no output./
> >> /
> >> - Time 4: we have a new Car that matches the previous brand, and on
> >> the other hand,
> >>                 we received a new Brand that matches the previous Car,
> >> so we should have two outputs.
> >> - Time 5: we receive an existing brand, but with an updated field (in
> >> this case the name), so we have
> >>                to replace the previous Brand with brandId, and if
> >> there are any previous matching Cars, we
> >>                have to output all the corresponding CarWithBrand with
> >> the changed field.
> >>
> >> So, we're using Flink Tables during the process, to maintain the
> >> latest status of the data regardless of time.
> >>
> >> And furthermore, here's a simplified java code example that represents
> >> what we've achieved so far:*flink_join.java*
> >>
> >> How would you recommend to achieve this with Flink ?
> >> Is our approach adequate ?
> >>
> >> Thank you.
> >>
> >> On Thu, Feb 11, 2021 at 11:50 AM Arvid Heise <ar...@apache.org
> >> <mailto:ar...@apache.org>> wrote:
> >>
> >>     Hi Abdelilah,
> >>
> >>     I think your approach is overly complicated (and probably slow) but
> >>     I might have misunderstood things. Naively, I'd assume that you just
> >>     want to union stream 1 and stream 2 instead of joining. Note that
> >>     for union the events must have the same schema, so you most likely
> >>     want to have a select on each stream before union. Summarizing:
> >>     Table3 = (select id, title, description from Table 1) union (select
> >>     id, title, description from Table 2)
> >>
> >>     If you use a retract stream, you probably do not need to use the
> >>     grouping and last value selection as well.
> >>
> >>     On Mon, Feb 8, 2021 at 3:33 PM Abdelilah CHOUKRI
> >>     <abdelilah.chou...@prt.manomano.com
> >>     <mailto:abdelilah.chou...@prt.manomano.com>> wrote:
> >>
> >>         Hi,
> >>
> >>         We're trying to use Flink 1.11 Java tables API to process a
> >>         streaming use case:
> >>
> >>         We have 2 streams, each one with different structures. Both
> >>         events, coming from Kafka, can be:
> >>         - A new event (not in the system already)
> >>         - An updated event (updating an event that previously was
> >> inserted)
> >>         so we only want to store the latest data in the Table.
> >>
> >>         We need to join the 2 previous Tables to have all this data
> >>         stored in the Flink system. We think that the best way is to
> >>         store joined data as a Table.
> >>         This is going to be a Flink Table, that will be a join of the 2
> >>         tables by a common key.
> >>
> >>         To sum up, we have:
> >>         - Stream 1 (coming from Kafka topic) -> Flink Table 1
> >>         - Stream 2 (coming from Kafka topic) -> Flink Table 2
> >>         - Table 3 = Table 1 join Table 2
> >>         - DataStream using RetractStream of Table 3
> >>
> >>         To get the last element in Table 1 and Table 2, we are using
> >>         Functions (LastValueAggFunction):
> >>
> >>         streamTableEnvironment.registerFunction("LAST_VALUE_STRING",
> >> new LastValueAggFunction.StringLastValueAggFunction());
> >>         ...
> >>         streamTableEnvironment.fromDataStream(inputDataStream)
> >>         .groupBy($("id"))
> >>         .select(
> >>         $("id").as("o_id"),
> >>         call("LAST_VALUE_STRING", $("title")).as("o_title"),
> >>         call("LAST_VALUE_STRING", $("description")).as("o_description")
> >>         );
> >>
> >>
> >>         The questions are:
> >>         - Is our approach correct to get the data stored in the Flink
> >>         system?
> >>         - Is it necessary to use the _/LastValueAggFunction /_in our
> >>         case ? as we want to retract the stream to
> >>         out custom Pojo instead of _/Row/_, but we're getting the
> >>         attached error: (attached*: stack_trace.log*)
> >>
> >>
> >>         Abdelilah Choukdi,
> >>         Backend dev at ManoMano.
> >>
> >
>
>

Reply via email to