Thank you guys for the interest, feedback and advies,
Just to clarify further on the why we used tables with grouping,
Form each DataStream we only interested in the last updated or new Event,
Also, we need to have ALL the previous Events storedin order to identify
if the incoming event is a new or an updated,
duplicated events (same data/fields as the stored ones) will be ignored.
So as we understood - and please correct us if we're wrong - we can
achieve this behaviour with the following steps:
*_step 1:_* Register the LastValueFunctions for each column type, so we
can store only the last incoming Event,
and when we retract to the stream, we can filter in later stage the
Events that have been changed/updated (step 4).
StreamTableEnvironment streamTableEnvironment =
StreamTableEnvironment.create(streamExecutionEnvironment, ...);
streamTableEnvironment.registerFunction("LAST_VALUE_STRING", new
LastValueAggFunction.StringLastValueAggFunction());
streamTableEnvironment.registerFunction("LAST_VALUE_LONG", new
LastValueAggFunction.LongLastValueAggFunction());
streamTableEnvironment.registerFunction("LAST_VALUE_BOOLEAN", new
LastValueAggFunction.BooleanLastValueAggFunction());
*_step 2:_* Use Flink Table and group them by Id to store the latest
data regardless of the Window time, (as if it was Primary Key in a SQL
Table)
Table carTable = streamTableEnvironment.fromDataStream(carStream)
.groupBy($("carId")) .select( $("carId").as("c_carId"),
call("LAST_VALUE_LONG", $("brandId")).as("c_brandId"),
call("LAST_VALUE_LONG", $("serialNumber")).as("c_serialNumber"),
call("LAST_VALUE_STRING", $("carName")).as("c_carName") );
*_step 3:_* Join both tables by the common Id, and group them by another
to merge both datas.
Table brandCarTable = carTable.join(brandTable)
.where($("c_brandId").isEqual($("b_brandId"))) .groupBy($("c_carId"))
.select( $("c_carId").as("carId"), call("LAST_VALUE_LONG",
$("b_brandId")).as("brandId"), call("LAST_VALUE_LONG",
$("c_serialNumber")).as("serialNumber"), call("LAST_VALUE_STRING",
$("c_carName")).as("carName"), call("LAST_VALUE_STRING",
$("b_brandName")).as("brandName") );
*_step 4:_* Retract the joined/grouped data, and filter by the boolean
`*flaggedJoin.f0*`, as we understood, only the new/updated Events will
be flagged `*True*`.
DataStream<BrandCar> brandCarStream =
streamTableEnvironment.toRetractStream(brandCarTable, BrandCar.class)
.filter(flaggedJoin -> flaggedJoin.f0)
.map(changedJoin -> changedJoin.f1)
.flatMap(...);
- Have we misunderstood the usage of LastValueFunctions ?
- Could we achieve the same with only DataStreas ? (without using Tables)
- If we switch to DataSteams, how can we store all the previous events
regardless of Time (without a Window)
- You seem to be concerned about the performance of the groupings, is it
regardless of what we use ? (DataSteams or Tables)
Thank you again, we're checking your suggestion about Broadcast.
On Thu, Feb 11, 2021 at 9:28 PM Arvid Heise <ar...@apache.org
<mailto:ar...@apache.org>> wrote:
Hi Abdelilah,
you are right that union does not work (well) in your case. I
misunderstood the relation between the two streams.
The ideal pattern would be a broadcast join imho. [1] I'm not sure
how to do it in Table API/SQL though, but I hope Timo can help here
as well.
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html>
On Thu, Feb 11, 2021 at 7:00 PM Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>> wrote:
After thinking about this topic again, I think UNION ALL will
not solve
the problem because you would need to group by brandId and
perform the
joining within the aggregate function which could also be quite
expensive.
Regards,
Timo
On 11.02.21 17:16, Timo Walther wrote:
> Hi Abdelilah,
>
> at a first glance your logic seems to be correct. But Arvid
is right
> that your pipeline might not have the optimal performance
that Flink can
> offer due to the 3 groupBy operations. I'm wondering what the
optimizer
> produces out of this plan. Maybe you can share it with us using
> `table.explain()` on the final table?
>
> I think what Arvid meant is a UNION ALL in SQL. You would
normalize the
> two streams into a CarWithBrand before (containing nulls for
the other
> side), and then groupBy/aggregate to the last value and
filter out
> invalid CarWithBrands.
>
> If DataStream API is an option for you I would consider using
the
> `connect()` method. A connect function can be stateful and
you might
> reduce your state size further. In your current
implementation, the join
> operator will store all input tables for processing. This
means car and
> brand state is stored twice.
>
> Regards,
> Timo
>
> On 11.02.21 16:06, Abdelilah CHOUKRI wrote:
>> Thank you Arvid for the reply,
>>
>> In fact, it's not a union of the same data, I'll try to
explain what
>> we want to achieve as a concept:
>>
>> We have 2 data sources, with two different schemas, but with
a common
>> field/attribute (example: /brandId/),
>> - /Cars/: receive data entries with high frequency, one
/Car/ can only
>> be related to one /Brand/. (with the field /brandId/)
>> - /Brands/: receive data entries with high frequency, one
/Brand/ can
>> be related to many /Cars/. (with the field /brandId/)
>> And we need to "merge" these data in a single output:
/CarWithBrand/.
>>
>> I'll try to explain the behaviour that we want to achieve
with the
>> following diagram:
>>
>> flink_flow.png
>>
>> - Time 1: we have a /Car/ and a /Brand/ matching by
/brandId, /so the
>> output should return a corresponding /CarWithRand.
>> /
>> /- /Time 2: we have a new /Car/, also it matched the
previous /Brand/,
>> so we output a /CarWithBrand./
>> - Time 3: we receive a new /Car/, but it does not match any
existing
>> /Brand,/ so no output./
>> /
>> - Time 4: we have a new Car that matches the previous brand,
and on
>> the other hand,
>> we received a new Brand that matches the
previous Car,
>> so we should have two outputs.
>> - Time 5: we receive an existing brand, but with an updated
field (in
>> this case the name), so we have
>> to replace the previous Brand with brandId,
and if
>> there are any previous matching Cars, we
>> have to output all the corresponding
CarWithBrand with
>> the changed field.
>>
>> So, we're using Flink Tables during the process, to maintain
the
>> latest status of the data regardless of time.
>>
>> And furthermore, here's a simplified java code example that
represents
>> what we've achieved so far:*flink_join.java*
>>
>> How would you recommend to achieve this with Flink ?
>> Is our approach adequate ?
>>
>> Thank you.
>>
>> On Thu, Feb 11, 2021 at 11:50 AM Arvid Heise
<ar...@apache.org <mailto:ar...@apache.org>
>> <mailto:ar...@apache.org <mailto:ar...@apache.org>>> wrote:
>>
>> Hi Abdelilah,
>>
>> I think your approach is overly complicated (and
probably slow) but
>> I might have misunderstood things. Naively, I'd assume
that you just
>> want to union stream 1 and stream 2 instead of joining.
Note that
>> for union the events must have the same schema, so you
most likely
>> want to have a select on each stream before union.
Summarizing:
>> Table3 = (select id, title, description from Table 1)
union (select
>> id, title, description from Table 2)
>>
>> If you use a retract stream, you probably do not need to
use the
>> grouping and last value selection as well.
>>
>> On Mon, Feb 8, 2021 at 3:33 PM Abdelilah CHOUKRI
>> <abdelilah.chou...@prt.manomano.com
<mailto:abdelilah.chou...@prt.manomano.com>
>> <mailto:abdelilah.chou...@prt.manomano.com
<mailto:abdelilah.chou...@prt.manomano.com>>> wrote:
>>
>> Hi,
>>
>> We're trying to use Flink 1.11 Java tables API to
process a
>> streaming use case:
>>
>> We have 2 streams, each one with different
structures. Both
>> events, coming from Kafka, can be:
>> - A new event (not in the system already)
>> - An updated event (updating an event that
previously was
>> inserted)
>> so we only want to store the latest data in the Table.
>>
>> We need to join the 2 previous Tables to have
all this data
>> stored in the Flink system. We think that the best
way is to
>> store joined data as a Table.
>> This is going to be a Flink Table, that will be a
join of the 2
>> tables by a common key.
>>
>> To sum up, we have:
>> - Stream 1 (coming from Kafka topic) -> Flink Table 1
>> - Stream 2 (coming from Kafka topic) -> Flink Table 2
>> - Table 3 = Table 1 join Table 2
>> - DataStream using RetractStream of Table 3
>>
>> To get the last element in Table 1 and Table 2, we
are using
>> Functions (LastValueAggFunction):
>>
>>
streamTableEnvironment.registerFunction("LAST_VALUE_STRING",
>> new LastValueAggFunction.StringLastValueAggFunction());
>> ...
>> streamTableEnvironment.fromDataStream(inputDataStream)
>> .groupBy($("id"))
>> .select(
>> $("id").as("o_id"),
>> call("LAST_VALUE_STRING", $("title")).as("o_title"),
>> call("LAST_VALUE_STRING",
$("description")).as("o_description")
>> );
>>
>>
>> The questions are:
>> - Is our approach correct to get the data stored in
the Flink
>> system?
>> - Is it necessary to use the _/LastValueAggFunction
/_in our
>> case ? as we want to retract the stream to
>> out custom Pojo instead of _/Row/_, but we're
getting the
>> attached error: (attached*: stack_trace.log*)
>>
>>
>> Abdelilah Choukdi,
>> Backend dev at ManoMano.
>>
>